diff --git a/include/fastdds/dds/subscriber/SampleInfo.hpp b/include/fastdds/dds/subscriber/SampleInfo.hpp index 7af134ba44e..8f9e5ba8da5 100644 --- a/include/fastdds/dds/subscriber/SampleInfo.hpp +++ b/include/fastdds/dds/subscriber/SampleInfo.hpp @@ -89,7 +89,7 @@ struct SampleInfo //! the generation difference between the time the sample was received, and the time the most recent sample was received. //! The most recent sample used for the calculation may or may not be in the returned collection - int32_t absoulte_generation_rank; + int32_t absolute_generation_rank; //! time provided by the DataWriter when the sample was written fastrtps::rtps::Time_t source_timestamp; diff --git a/include/fastdds/rtps/common/CacheChange.h b/include/fastdds/rtps/common/CacheChange.h index 556149ad8a9..1f3c260ca8a 100644 --- a/include/fastdds/rtps/common/CacheChange.h +++ b/include/fastdds/rtps/common/CacheChange.h @@ -57,6 +57,10 @@ struct CacheChangeReaderInfo_t { //!Reception TimeStamp (only used in Readers) Time_t receptionTimestamp; + //! Disposed generation of the instance when this entry was added to it + int32_t disposed_generation_count; + //! No-writers generation of the instance when this entry was added to it + int32_t no_writers_generation_count; }; /** diff --git a/include/fastdds/rtps/history/ReaderHistory.h b/include/fastdds/rtps/history/ReaderHistory.h index 2d83ca02bbd..eb2a9fa6c39 100644 --- a/include/fastdds/rtps/history/ReaderHistory.h +++ b/include/fastdds/rtps/history/ReaderHistory.h @@ -152,6 +152,19 @@ class ReaderHistory : public History CacheChange_t** min_change, const GUID_t& writerGuid); + /** + * Called when a writer is unmatched from the reader holding this history. + * + * This method will remove all the changes on the history that came from the writer being unmatched and which have + * not yet been notified to the user. + * + * @param writer_guid GUID of the writer being unmatched. + * @param last_notified_seq Last sequence number from the specified writer that was notified to the user. + */ + RTPS_DllAPI virtual void writer_unmatched( + const GUID_t& writer_guid, + const SequenceNumber_t& last_notified_seq); + protected: RTPS_DllAPI bool do_reserve_cache( @@ -161,6 +174,21 @@ class ReaderHistory : public History RTPS_DllAPI void do_release_cache( CacheChange_t* ch) override; + template + inline void remove_changes_with_pred( + Pred pred) + { + assert(nullptr != mp_reader); + assert(nullptr != mp_mutex); + + std::lock_guard guard(*mp_mutex); + std::vector::iterator new_end = std::remove_if(m_changes.begin(), m_changes.end(), pred); + while (new_end != m_changes.end()) + { + new_end = remove_change_nts(new_end); + } + } + //!Pointer to the reader RTPSReader* mp_reader; diff --git a/include/fastrtps/subscriber/SubscriberHistory.h b/include/fastrtps/subscriber/SubscriberHistory.h index d0d1f1fdf2b..be94ed6a291 100644 --- a/include/fastrtps/subscriber/SubscriberHistory.h +++ b/include/fastrtps/subscriber/SubscriberHistory.h @@ -44,8 +44,6 @@ class SubscriberHistory : public rtps::ReaderHistory { public: - using instance_info = std::pair*>; - /** * Constructor. Requires information about the subscriber. * @param topic_att TopicAttributes. @@ -179,25 +177,6 @@ class SubscriberHistory : public rtps::ReaderHistory rtps::InstanceHandle_t& handle, std::chrono::steady_clock::time_point& next_deadline_us); - /** - * @brief Get the list of changes corresponding to an instance handle. - * @param handle The handle to the instance. - * @param exact Indicates if the handle should match exactly (true) or if the first instance greater than the - * input handle should be returned. - * @return A pair where: - * - @c first is a boolean indicating if an instance was found - * - @c second is a pair where: - * - @c first is the handle of the returned instance - * - @c second is a pointer to a std::vector with the list of changes for the - * returned instance - * - * @remarks When used on a NO_KEY topic, an instance will only be returned when called with - * `handle = HANDLE_NIL` and `exact = false`. - */ - std::pair lookup_instance( - const rtps::InstanceHandle_t& handle, - bool exact); - private: using t_m_Inst_Caches = std::map; diff --git a/include/fastrtps/utils/collections/ResourceLimitedVector.hpp b/include/fastrtps/utils/collections/ResourceLimitedVector.hpp index 91aa80a3c12..cafe1291053 100644 --- a/include/fastrtps/utils/collections/ResourceLimitedVector.hpp +++ b/include/fastrtps/utils/collections/ResourceLimitedVector.hpp @@ -119,6 +119,48 @@ class ResourceLimitedVector return *this; } + /** + * Insert value before pos. + * + * @param pos iterator before which the content will be inserted. pos may be the end() iterator. + * @param value element value to insert. + * + * @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits. + */ + iterator insert( + const_iterator pos, + const value_type& value) + { + auto dist = std::distance(collection_.cbegin(), pos); + if (!ensure_capacity()) + { + return end(); + } + + return collection_.insert(collection_.cbegin() + dist, value); + } + + /** + * Insert value before pos. + * + * @param pos iterator before which the content will be inserted. pos may be the end() iterator. + * @param value element value to insert. + * + * @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits. + */ + iterator insert( + const_iterator pos, + value_type&& value) + { + auto dist = std::distance(collection_.cbegin(), pos); + if (!ensure_capacity()) + { + return end(); + } + + return collection_.insert(collection_.cbegin() + dist, std::move(value)); + } + /** * Add element at the end. * diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index e8e38f74927..53b987cfeb8 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -85,9 +85,7 @@ set(${PROJECT_NAME}_source_files fastrtps_deprecated/subscriber/Subscriber.cpp fastrtps_deprecated/subscriber/SubscriberImpl.cpp fastrtps_deprecated/subscriber/SubscriberHistory.cpp - fastdds/subscriber/DataReader.cpp fastdds/publisher/DataWriter.cpp - fastdds/subscriber/DataReaderImpl.cpp fastdds/publisher/DataWriterImpl.cpp fastdds/topic/ContentFilteredTopic.cpp fastdds/topic/Topic.cpp @@ -105,6 +103,7 @@ set(${PROJECT_NAME}_source_files fastdds/subscriber/Subscriber.cpp fastdds/subscriber/DataReader.cpp fastdds/subscriber/DataReaderImpl.cpp + fastdds/subscriber/history/DataReaderHistory.cpp fastdds/domain/DomainParticipantFactory.cpp fastdds/domain/DomainParticipantImpl.cpp fastdds/domain/DomainParticipant.cpp diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 61c7eaf8dc6..c013ac36b94 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -62,40 +62,6 @@ namespace eprosima { namespace fastdds { namespace dds { -static void sample_info_to_dds ( - const SampleInfo_t& rtps_info, - SampleInfo* dds_info) -{ - dds_info->sample_state = NOT_READ_SAMPLE_STATE; - dds_info->view_state = NOT_NEW_VIEW_STATE; - dds_info->disposed_generation_count = 0; - dds_info->no_writers_generation_count = 1; - dds_info->sample_rank = 0; - dds_info->generation_rank = 0; - dds_info->absoulte_generation_rank = 0; - dds_info->source_timestamp = rtps_info.sourceTimestamp; - dds_info->reception_timestamp = rtps_info.receptionTimestamp; - dds_info->instance_handle = rtps_info.iHandle; - dds_info->publication_handle = fastrtps::rtps::InstanceHandle_t(rtps_info.sample_identity.writer_guid()); - dds_info->sample_identity = rtps_info.sample_identity; - dds_info->related_sample_identity = rtps_info.related_sample_identity; - dds_info->valid_data = rtps_info.sampleKind == eprosima::fastrtps::rtps::ALIVE ? true : false; - - switch (rtps_info.sampleKind) - { - case eprosima::fastrtps::rtps::ALIVE: - dds_info->instance_state = ALIVE_INSTANCE_STATE; - break; - case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: - dds_info->instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE; - break; - default: - //TODO [ILG] change this if the other kinds ever get implemented - dds_info->instance_state = ALIVE_INSTANCE_STATE; - break; - } -} - static bool collections_have_same_properties( const LoanableCollection& data_values, const SampleInfoSeq& sample_infos) @@ -131,11 +97,7 @@ DataReaderImpl::DataReaderImpl( , topic_(topic) , qos_(&qos == &DATAREADER_QOS_DEFAULT ? subscriber_->get_default_datareader_qos() : qos) #pragma warning (disable : 4355 ) - , history_(topic_attributes(), - type_.get(), - qos_.get_readerqos(subscriber_->get_qos()), - type_->m_typeSize + 3, /* Possible alignment */ - qos_.endpoint().history_memory_policy) + , history_(type, *topic, qos_) , listener_(listener) , reader_listener_(this) , deadline_duration_us_(qos_.deadline().period.to_ns() * 1e-3) @@ -714,10 +676,8 @@ ReturnCode_t DataReaderImpl::get_first_untaken_info( return ReturnCode_t::RETCODE_NOT_ENABLED; } - SampleInfo_t rtps_info; - if (history_.get_first_untaken_info(&rtps_info)) + if (history_.get_first_untaken_info(*info)) { - sample_info_to_dds(rtps_info, info); return ReturnCode_t::RETCODE_OK; } return ReturnCode_t::RETCODE_NO_DATA; @@ -892,10 +852,10 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( bool DataReaderImpl::on_new_cache_change_added( const CacheChange_t* const change) { + std::lock_guard guard(reader_->getMutex()); + if (qos_.deadline().period != c_TimeInfinite) { - std::unique_lock lock(reader_->getMutex()); - if (!history_.set_next_deadline( change->instanceHandle, steady_clock::now() + duration_cast(deadline_duration_us_))) @@ -913,6 +873,7 @@ bool DataReaderImpl::on_new_cache_change_added( } CacheChange_t* new_change = const_cast(change); + history_.update_instance_nts(new_change); if (qos_.lifespan().duration == c_TimeInfinite) { @@ -968,6 +929,11 @@ void DataReaderImpl::update_subscription_matched_status( subscription_matched_status_.last_publication_handle = status.last_publication_handle; } + if (count_change < 0) + { + history_.writer_not_alive(iHandle2GUID(status.last_publication_handle)); + } + StatusMask notify_status = StatusMask::subscription_matched(); DataReaderListener* listener = get_listener_for(notify_status); if (listener != nullptr) @@ -1235,6 +1201,11 @@ RequestedIncompatibleQosStatus& DataReaderImpl::update_requested_incompatible_qo LivelinessChangedStatus& DataReaderImpl::update_liveliness_status( const fastrtps::LivelinessChangedStatus& status) { + if (0 < status.not_alive_count_change) + { + history_.writer_not_alive(iHandle2GUID(status.last_publication_handle)); + } + liveliness_changed_status_.alive_count = status.alive_count; liveliness_changed_status_.not_alive_count = status.not_alive_count; liveliness_changed_status_.alive_count_change += status.alive_count_change; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index d083681e19d..44aeafc7152 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -36,7 +36,6 @@ #include #include -#include #include #include @@ -46,6 +45,8 @@ #include #include +#include + using eprosima::fastrtps::types::ReturnCode_t; namespace eprosima { @@ -331,7 +332,7 @@ class DataReaderImpl DataReaderQos qos_; //!History - fastrtps::SubscriberHistory history_; + detail::DataReaderHistory history_; //!Listener DataReaderListener* listener_ = nullptr; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp index dbd4a79b6ea..5ba529ebed3 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp @@ -27,7 +27,6 @@ #include #include -#include #include #include @@ -35,6 +34,7 @@ #include #include #include +#include #include #include @@ -51,7 +51,7 @@ namespace detail { struct ReadTakeCommand { using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t; - using history_type = eprosima::fastrtps::SubscriberHistory; + using history_type = eprosima::fastdds::dds::detail::DataReaderHistory; using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t; using RTPSReader = eprosima::fastrtps::rtps::RTPSReader; using WriterProxy = eprosima::fastrtps::rtps::WriterProxy; @@ -64,7 +64,7 @@ struct ReadTakeCommand SampleInfoSeq& sample_infos, int32_t max_samples, const StateFilter& states, - history_type::instance_info instance, + const history_type::instance_info& instance, bool single_instance = false) : type_(reader.type_) , loan_manager_(reader.loan_manager_) @@ -108,8 +108,8 @@ struct ReadTakeCommand // Traverse changes on current instance bool ret_val = false; LoanableCollection::size_type first_slot = current_slot_; - auto it = instance_.second->begin(); - while (!finished_ && it != instance_.second->end()) + auto it = instance_.second->cache_changes.begin(); + while (!finished_ && it != instance_.second->cache_changes.end()) { CacheChange_t* change = *it; SampleStateKind check; @@ -142,10 +142,9 @@ struct ReadTakeCommand // in the future also if (!is_future_change) { - // Add sample and info to collections ReturnCode_t previous_return_value = return_value_; - bool added = add_sample(change, remove_change); + bool added = add_sample(*it, remove_change); reader_->end_sample_access_nts(change, wp, added); // Check if the payload is dirty @@ -181,6 +180,7 @@ struct ReadTakeCommand if (current_slot_ > first_slot) { + instance_.second->view_state = ViewStateKind::NOT_NEW_VIEW_STATE; ret_val = true; // complete sample infos @@ -208,6 +208,41 @@ struct ReadTakeCommand return return_value_; } + static void generate_info( + SampleInfo& info, + const DataReaderInstance& instance, + const DataReaderCacheChange& item) + { + info.sample_state = item->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE; + info.instance_state = instance.instance_state; + info.view_state = instance.view_state; + info.disposed_generation_count = item->reader_info.disposed_generation_count; + info.no_writers_generation_count = item->reader_info.no_writers_generation_count; + info.sample_rank = 0; + info.generation_rank = 0; + info.absolute_generation_rank = 0; + info.source_timestamp = item->sourceTimestamp; + info.reception_timestamp = item->reader_info.receptionTimestamp; + info.instance_handle = item->instanceHandle; + info.publication_handle = InstanceHandle_t(item->writerGUID); + info.sample_identity.writer_guid(item->writerGUID); + info.sample_identity.sequence_number(item->sequenceNumber); + info.related_sample_identity = item->write_params.sample_identity(); + info.valid_data = true; + + switch (item->kind) + { + case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: + case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED: + case eprosima::fastrtps::rtps::NOT_ALIVE_UNREGISTERED: + info.valid_data = false; + break; + case eprosima::fastrtps::rtps::ALIVE: + default: + break; + } + } + private: const TypeSupport& type_; @@ -243,14 +278,15 @@ struct ReadTakeCommand bool is_current_instance_valid() { - // We are not implementing instance_state or view_state yet, so all instances will be considered to have - // a valid state. In the future this should check instance_state against states_.instance_states and - // view_state against states_.view_states - return true; + // Check instance_state against states_.instance_states and view_state against states_.view_states + auto instance_state = instance_.second->instance_state; + auto view_state = instance_.second->view_state; + return (0 != (states_.instance_states & instance_state)) && (0 != (states_.view_states & view_state)); } bool next_instance() { + history_.check_and_remove_instance(instance_); if (single_instance_) { finished_ = true; @@ -270,7 +306,7 @@ struct ReadTakeCommand } bool add_sample( - CacheChange_t* change, + const DataReaderCacheChange& item, bool& deserialization_error) { bool ret_val = false; @@ -284,10 +320,10 @@ struct ReadTakeCommand sample_infos_.length(new_len); // Add information - generate_info(change); + generate_info(item); if (sample_infos_[current_slot_].valid_data) { - if (!deserialize_sample(change)) + if (!deserialize_sample(item)) { // Decrement length of collections data_values_.length(current_slot_); @@ -295,11 +331,10 @@ struct ReadTakeCommand deserialization_error = true; return false; } - - // Mark that some data is available - return_value_ = ReturnCode_t::RETCODE_OK; } + // Mark that some data is available + return_value_ = ReturnCode_t::RETCODE_OK; ++current_slot_; --remaining_samples_; ret_val = true; @@ -330,48 +365,18 @@ struct ReadTakeCommand } void generate_info( - CacheChange_t* change) + const DataReaderCacheChange& item) { // Loan when necessary if (!sample_infos_.has_ownership()) { - SampleInfo* item = info_pool_.get_item(); - assert(item != nullptr); - const_cast(sample_infos_.buffer())[current_slot_] = item; + SampleInfo* pool_item = info_pool_.get_item(); + assert(pool_item != nullptr); + const_cast(sample_infos_.buffer())[current_slot_] = pool_item; } SampleInfo& info = sample_infos_[current_slot_]; - info.sample_state = change->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE; - info.view_state = NOT_NEW_VIEW_STATE; - info.disposed_generation_count = 0; - info.no_writers_generation_count = 1; - info.sample_rank = 0; - info.generation_rank = 0; - info.absoulte_generation_rank = 0; - info.source_timestamp = change->sourceTimestamp; - info.reception_timestamp = change->reader_info.receptionTimestamp; - info.instance_handle = handle_; - info.publication_handle = InstanceHandle_t(change->writerGUID); - info.sample_identity.writer_guid(change->writerGUID); - info.sample_identity.sequence_number(change->sequenceNumber); - info.related_sample_identity = change->write_params.sample_identity(); - info.valid_data = true; - - switch (change->kind) - { - case eprosima::fastrtps::rtps::ALIVE: - info.instance_state = ALIVE_INSTANCE_STATE; - break; - case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: - case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED: - info.instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE; - info.valid_data = false; - break; - default: - //TODO [ILG] change this if the other kinds ever get implemented - info.instance_state = ALIVE_INSTANCE_STATE; - break; - } + generate_info(info, *instance_.second, item); } bool check_datasharing_validity( diff --git a/src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp b/src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp new file mode 100644 index 00000000000..5b13109f721 --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp @@ -0,0 +1,39 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderCacheChange.hpp + */ + +#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_ +#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +/// A DataReader cache entry +using DataReaderCacheChange = fastrtps::rtps::CacheChange_t*; + +} /* namespace detail */ +} /* namespace dds */ +} /* namespace fastdds */ +} /* namespace eprosima */ + +#endif // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_ diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp new file mode 100644 index 00000000000..9ca5826bf1c --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -0,0 +1,707 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderHistory.cpp + */ + +#include +#include + +#include "DataReaderHistory.hpp" + +#include +#include + +#include +#include + +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +using namespace eprosima::fastrtps::rtps; + +using eprosima::fastrtps::RecursiveTimedMutex; + +static HistoryAttributes to_history_attributes( + const TypeSupport& type, + const DataReaderQos& qos) +{ + auto initial_samples = qos.resource_limits().allocated_samples; + auto max_samples = qos.resource_limits().max_samples; + + if (qos.history().kind != KEEP_ALL_HISTORY_QOS) + { + max_samples = qos.history().depth; + if (type->m_isGetKeyDefined) + { + max_samples *= qos.resource_limits().max_instances; + } + + initial_samples = std::min(initial_samples, max_samples); + } + + auto mempolicy = qos.endpoint().history_memory_policy; + auto payloadMaxSize = type->m_typeSize + 3; // possible alignment + + return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples); +} + +DataReaderHistory::DataReaderHistory( + const TypeSupport& type, + const TopicDescription& topic, + const DataReaderQos& qos) + : ReaderHistory(to_history_attributes(type, qos)) + , key_writers_allocation_(qos.reader_resource_limits().matched_publisher_allocation) + , history_qos_(qos.history()) + , resource_limited_qos_(qos.resource_limits()) + , topic_name_(topic.get_name()) + , type_name_(topic.get_type_name()) + , has_keys_(type->m_isGetKeyDefined) + , type_(type.get()) + , get_key_object_(nullptr) +{ + if (resource_limited_qos_.max_samples == 0) + { + resource_limited_qos_.max_samples = std::numeric_limits::max(); + } + + if (resource_limited_qos_.max_instances == 0) + { + resource_limited_qos_.max_instances = std::numeric_limits::max(); + } + + if (resource_limited_qos_.max_samples_per_instance == 0) + { + resource_limited_qos_.max_samples_per_instance = std::numeric_limits::max(); + } + + if (type_->m_isGetKeyDefined) + { + get_key_object_ = type_->createData(); + + if (resource_limited_qos_.max_samples_per_instance < std::numeric_limits::max()) + { + key_changes_allocation_.maximum = resource_limited_qos_.max_samples_per_instance; + } + } + else + { + resource_limited_qos_.max_instances = 1; + resource_limited_qos_.max_samples_per_instance = resource_limited_qos_.max_samples; + key_changes_allocation_.initial = resource_limited_qos_.allocated_samples; + key_changes_allocation_.maximum = resource_limited_qos_.max_samples; + + keyed_changes_.emplace(c_InstanceHandle_Unknown, + DataReaderInstance{ key_changes_allocation_, key_writers_allocation_ }); + } + + using std::placeholders::_1; + using std::placeholders::_2; + + receive_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? + std::bind(&DataReaderHistory::received_change_keep_all, this, _1, _2) : + std::bind(&DataReaderHistory::received_change_keep_last, this, _1, _2); + + complete_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? + std::bind(&DataReaderHistory::completed_change_keep_all, this, _1, _2) : + std::bind(&DataReaderHistory::completed_change_keep_last, this, _1, _2); + + if (!has_keys_) + { + compute_key_for_change_fn_ = [](CacheChange_t* change) + { + change->instanceHandle = c_InstanceHandle_Unknown; + return true; + }; + } + else + { + compute_key_for_change_fn_ = + [this](CacheChange_t* a_change) + { + if (a_change->instanceHandle.isDefined()) + { + return true; + } + + if (!a_change->is_fully_assembled()) + { + return false; + } + + if (type_ != nullptr) + { + logInfo(SUBSCRIBER, "Getting Key of change with no Key transmitted"); + type_->deserialize(&a_change->serializedPayload, get_key_object_); + bool is_key_protected = false; +#if HAVE_SECURITY + is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected; +#endif // if HAVE_SECURITY + return type_->getKey(get_key_object_, &a_change->instanceHandle, is_key_protected); + } + + logWarning(SUBSCRIBER, "NO KEY in topic: " << topic_name_ + << " and no method to obtain it"; ); + return false; + }; + } +} + +DataReaderHistory::~DataReaderHistory() +{ + if (type_->m_isGetKeyDefined) + { + type_->deleteData(get_key_object_); + } +} + +bool DataReaderHistory::can_change_be_added_nts( + const GUID_t& writer_guid, + uint32_t total_payload_size, + size_t unknown_missing_changes_up_to, + bool& will_never_be_accepted) const +{ + if (!ReaderHistory::can_change_be_added_nts(writer_guid, total_payload_size, unknown_missing_changes_up_to, + will_never_be_accepted)) + { + return false; + } + + will_never_be_accepted = false; + return (KEEP_ALL_HISTORY_QOS != history_qos_.kind) || + (m_changes.size() + unknown_missing_changes_up_to < static_cast(resource_limited_qos_.max_samples)); +} + +bool DataReaderHistory::received_change( + CacheChange_t* a_change, + size_t unknown_missing_changes_up_to) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + + std::lock_guard guard(*mp_mutex); + return receive_fn_(a_change, unknown_missing_changes_up_to); +} + +bool DataReaderHistory::received_change_keep_all( + CacheChange_t* a_change, + size_t unknown_missing_changes_up_to) +{ + if (!compute_key_for_change_fn_(a_change)) + { + // Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too. + return add_to_reader_history_if_not_full(a_change); + } + + InstanceCollection::iterator vit; + if (find_key(a_change->instanceHandle, vit)) + { + DataReaderInstance::ChangeCollection& instance_changes = vit->second.cache_changes; + size_t total_size = instance_changes.size() + unknown_missing_changes_up_to; + if (total_size < static_cast(resource_limited_qos_.max_samples_per_instance)) + { + return add_received_change_with_key(a_change, vit->second); + } + + logInfo(SUBSCRIBER, "Change not added due to maximum number of samples per instance"); + } + + return false; +} + +bool DataReaderHistory::received_change_keep_last( + CacheChange_t* a_change, + size_t /* unknown_missing_changes_up_to */) +{ + if (!compute_key_for_change_fn_(a_change)) + { + // Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too. + return add_to_reader_history_if_not_full(a_change); + } + + InstanceCollection::iterator vit; + if (find_key(a_change->instanceHandle, vit)) + { + bool add = false; + DataReaderInstance::ChangeCollection& instance_changes = vit->second.cache_changes; + if (instance_changes.size() < static_cast(history_qos_.depth)) + { + add = true; + } + else + { + // Try to substitute the oldest sample. + CacheChange_t* first_change = instance_changes.at(0); + if (a_change->sourceTimestamp < first_change->sourceTimestamp) + { + // Received change is older than oldest, and should be discarded + return true; + } + + // As the instance is ordered by source timestamp, we can always remove the first one. + add = remove_change_sub(first_change); + } + + if (add) + { + return add_received_change_with_key(a_change, vit->second); + } + } + + return false; +} + +bool DataReaderHistory::add_received_change_with_key( + CacheChange_t* a_change, + DataReaderInstance& instance) +{ + if (add_to_reader_history_if_not_full(a_change)) + { + add_to_instance(a_change, instance); + return true; + } + + return false; +} + +bool DataReaderHistory::add_to_reader_history_if_not_full( + CacheChange_t* a_change) +{ + if (m_isHistoryFull) + { + // Discarding the sample. + logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << type_name_); + return false; + } + + bool ret_value = add_change(a_change); + if (m_changes.size() == static_cast(m_att.maximumReservedCaches)) + { + m_isHistoryFull = true; + } + return ret_value; +} + +void DataReaderHistory::add_to_instance( + CacheChange_t* a_change, + DataReaderInstance& instance) +{ + // ADD TO KEY VECTOR + DataReaderCacheChange item = a_change; + eprosima::utilities::collections::sorted_vector_insert(instance.cache_changes, item, + [](const DataReaderCacheChange& lhs, const DataReaderCacheChange& rhs) + { + return lhs->sourceTimestamp < rhs->sourceTimestamp; + }); + + logInfo(SUBSCRIBER, mp_reader->getGuid().entityId + << ": Change " << a_change->sequenceNumber << " added from: " + << a_change->writerGUID << " with KEY: " << a_change->instanceHandle; ); +} + +bool DataReaderHistory::get_first_untaken_info( + SampleInfo& info) +{ + std::lock_guard lock(*mp_mutex); + + CacheChange_t* change = nullptr; + WriterProxy* wp = nullptr; + if (mp_reader->nextUntakenCache(&change, &wp)) + { + auto it = keyed_changes_.find(change->instanceHandle); + assert(it != keyed_changes_.end()); + auto& instance_changes = it->second.cache_changes; + auto item = + std::find_if(instance_changes.cbegin(), instance_changes.cend(), + [change](const DataReaderCacheChange& v) + { + return v == change; + }); + ReadTakeCommand::generate_info(info, it->second, *item); + mp_reader->change_read_by_user(change, wp, false); + return true; + } + + return false; +} + +bool DataReaderHistory::find_key( + const InstanceHandle_t& handle, + InstanceCollection::iterator& vit_out) +{ + InstanceCollection::iterator vit; + vit = keyed_changes_.find(handle); + if (vit != keyed_changes_.end()) + { + vit_out = vit; + return true; + } + + if (keyed_changes_.size() < static_cast(resource_limited_qos_.max_instances)) + { + vit_out = keyed_changes_.emplace(handle, + DataReaderInstance{key_changes_allocation_, key_writers_allocation_}).first; + return true; + } + + for (vit = keyed_changes_.begin(); vit != keyed_changes_.end(); ++vit) + { + if (vit->second.cache_changes.size() == 0) + { + keyed_changes_.erase(vit); + vit_out = keyed_changes_.emplace(handle, + DataReaderInstance{ key_changes_allocation_, key_writers_allocation_ }).first; + return true; + } + } + + logWarning(SUBSCRIBER, "History has reached the maximum number of instances"); + return false; +} + +void DataReaderHistory::writer_unmatched( + const GUID_t& writer_guid, + const SequenceNumber_t& last_notified_seq) +{ + remove_changes_with_pred( + [&writer_guid, &last_notified_seq](CacheChange_t* ch) + { + return (writer_guid == ch->writerGUID) && (last_notified_seq < ch->sequenceNumber); + }); +} + +bool DataReaderHistory::remove_change_sub( + CacheChange_t* change) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + + std::lock_guard guard(*mp_mutex); + bool found = false; + InstanceCollection::iterator vit; + if (find_key(change->instanceHandle, vit)) + { + for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) + { + if ((*chit)->sequenceNumber == change->sequenceNumber && + (*chit)->writerGUID == change->writerGUID) + { + vit->second.cache_changes.erase(chit); + found = true; + break; + } + } + } + if (!found) + { + logError(SUBSCRIBER, "Change not found on this key, something is wrong"); + } + + if (remove_change(change)) + { + m_isHistoryFull = false; + return true; + } + + return false; +} + +bool DataReaderHistory::remove_change_sub( + CacheChange_t* change, + DataReaderInstance::ChangeCollection::iterator& it) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + + std::lock_guard guard(*mp_mutex); + bool found = false; + InstanceCollection::iterator vit; + if (find_key(change->instanceHandle, vit)) + { + for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) + { + if ((*chit)->sequenceNumber == change->sequenceNumber && + (*chit)->writerGUID == change->writerGUID) + { + assert(it == chit); + it = vit->second.cache_changes.erase(chit); + found = true; + break; + } + } + } + if (!found) + { + logError(SUBSCRIBER, "Change not found on this key, something is wrong"); + } + + const_iterator chit = find_change_nts(change); + if (chit == changesEnd()) + { + logInfo(RTPS_WRITER_HISTORY, "Trying to remove a change not in history"); + return false; + } + + m_isHistoryFull = false; + ReaderHistory::remove_change_nts(chit); + + return true; +} + +bool DataReaderHistory::set_next_deadline( + const InstanceHandle_t& handle, + const std::chrono::steady_clock::time_point& next_deadline_us) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + std::lock_guard guard(*mp_mutex); + auto it = keyed_changes_.find(handle); + if (it == keyed_changes_.end()) + { + return false; + } + + it->second.next_deadline_us = next_deadline_us; + return true; +} + +bool DataReaderHistory::get_next_deadline( + InstanceHandle_t& handle, + std::chrono::steady_clock::time_point& next_deadline_us) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + std::lock_guard guard(*mp_mutex); + auto min = std::min_element(keyed_changes_.begin(), + keyed_changes_.end(), + []( + const std::pair& lhs, + const std::pair& rhs) + { + return lhs.second.next_deadline_us < rhs.second.next_deadline_us; + }); + handle = min->first; + next_deadline_us = min->second.next_deadline_us; + return true; +} + +std::pair DataReaderHistory::lookup_instance( + const InstanceHandle_t& handle, + bool exact) +{ + if (!has_keys_) + { + if (handle.isDefined()) + { + // NO_KEY topics can only return the ficticious instance. + // Execution can only get here for two reasons: + // - Looking for a specific instance (exact = true) + // - Looking for the next instance to the ficticious one (exact = false) + // In both cases, no instance should be returned + return { false, {InstanceHandle_t(), nullptr} }; + } + + if (exact) + { + // Looking for HANDLE_NIL, nothing to return + return { false, {InstanceHandle_t(), nullptr} }; + } + + // Looking for the first instance, return the ficticious one containing all changes + InstanceHandle_t tmp; + tmp.value[0] = 1; + return { true, {tmp, &keyed_changes_.begin()->second} }; + } + + InstanceCollection::iterator it; + + if (exact) + { + it = keyed_changes_.find(handle); + } + else + { + auto comp = [](const InstanceHandle_t& h, const std::pair& it) + { + return h < it.first; + }; + it = std::upper_bound(keyed_changes_.begin(), keyed_changes_.end(), handle, comp); + } + + if (it != keyed_changes_.end()) + { + return { true, {it->first, &(it->second)} }; + } + return { false, {InstanceHandle_t(), nullptr} }; +} + +void DataReaderHistory::check_and_remove_instance( + DataReaderHistory::instance_info& instance_info) +{ + DataReaderInstance* instance = instance_info.second; + if (instance->cache_changes.empty() && + (InstanceStateKind::ALIVE_INSTANCE_STATE != instance->instance_state) && + instance_info.first.isDefined()) + { + keyed_changes_.erase(instance_info.first); + instance_info.second = nullptr; + } +} + +ReaderHistory::iterator DataReaderHistory::remove_change_nts( + ReaderHistory::const_iterator removal, + bool release) +{ + if (removal != changesEnd()) + { + CacheChange_t* p_sample = *removal; + + if (!has_keys_ || p_sample->is_fully_assembled()) + { + // clean any references to this CacheChange in the key state collection + auto it = keyed_changes_.find(p_sample->instanceHandle); + + // if keyed and in history must be in the map + assert(it != keyed_changes_.end()); + + auto& c = it->second.cache_changes; + c.erase(std::remove(c.begin(), c.end(), p_sample), c.end()); + } + } + + // call the base class + return ReaderHistory::remove_change_nts(removal, release); +} + +bool DataReaderHistory::completed_change( + CacheChange_t* change) +{ + bool ret_value = true; + + if (!change->instanceHandle.isDefined()) + { + InstanceCollection::iterator vit; + ret_value = compute_key_for_change_fn_(change) && find_key(change->instanceHandle, vit); + if (ret_value) + { + ret_value = !change->instanceHandle.isDefined() || complete_fn_(change, vit->second); + } + + if (!ret_value) + { + const_iterator chit = find_change_nts(change); + if (chit != changesEnd()) + { + m_isHistoryFull = false; + remove_change_nts(chit); + } + else + { + logError(SUBSCRIBER, "Change should exist but didn't find it"); + } + } + } + + return ret_value; +} + +bool DataReaderHistory::completed_change_keep_all( + CacheChange_t* change, + DataReaderInstance& instance) +{ + DataReaderInstance::ChangeCollection& instance_changes = instance.cache_changes; + if (instance_changes.size() < static_cast(resource_limited_qos_.max_samples_per_instance)) + { + add_to_instance(change, instance); + return true; + } + + logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance"); + return false; +} + +bool DataReaderHistory::completed_change_keep_last( + CacheChange_t* change, + DataReaderInstance& instance) +{ + bool add = false; + DataReaderInstance::ChangeCollection& instance_changes = instance.cache_changes; + if (instance_changes.size() < static_cast(history_qos_.depth)) + { + add = true; + } + else + { + // Try to substitute the oldest sample. + + // As the instance should be ordered following the presentation QoS, we can always remove the first one. + add = remove_change_sub(instance_changes.at(0)); + } + + if (add) + { + add_to_instance(change, instance); + return true; + } + + return false; +} + +void DataReaderHistory::update_instance_nts( + CacheChange_t* const change) +{ + InstanceCollection::iterator vit; + vit = keyed_changes_.find(change->instanceHandle); + + assert(vit != keyed_changes_.end()); + vit->second.update_state(change->kind, change->writerGUID); + change->reader_info.disposed_generation_count = vit->second.disposed_generation_count; + change->reader_info.no_writers_generation_count = vit->second.no_writers_generation_count; +} + +void DataReaderHistory::writer_not_alive( + const fastrtps::rtps::GUID_t& writer_guid) +{ + for (auto& it : keyed_changes_) + { + it.second.writer_removed(writer_guid); + } +} + +} // namespace detail +} // namsepace dds +} // namespace fastdds +} // namsepace eprosima diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp new file mode 100644 index 00000000000..629ce5f8f1f --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp @@ -0,0 +1,313 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderHistory.hpp + */ + +#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_ +#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_ + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "DataReaderInstance.hpp" + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +/** + * Class DataReaderHistory, container of the different CacheChanges of a DataReader + */ +class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory +{ +public: + + using MemoryManagementPolicy_t = eprosima::fastrtps::rtps::MemoryManagementPolicy_t; + using InstanceHandle_t = eprosima::fastrtps::rtps::InstanceHandle_t; + using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t; + using GUID_t = eprosima::fastrtps::rtps::GUID_t; + using SequenceNumber_t = eprosima::fastrtps::rtps::SequenceNumber_t; + + using instance_info = std::pair; + + /** + * Constructor. Requires information about the DataReader. + * @param type Type information. Needed to know if the type is keyed, as long as the maximum serialized size. + * @param topic Topic description. Topic and type name are used on debug messages. + * @param qos DataReaderQoS policy. History related limits are taken from here. + */ + DataReaderHistory( + const TypeSupport& type, + const TopicDescription& topic, + const DataReaderQos& qos); + + ~DataReaderHistory() override; + + /** + * Remove a specific change from the history. + * No Thread Safe + * @param removal iterator to the CacheChange_t to remove. + * @param release defaults to true and hints if the CacheChange_t should return to the pool + * @return iterator to the next CacheChange_t or end iterator. + */ + iterator remove_change_nts( + const_iterator removal, + bool release = true) override; + + /** + * Check if a new change can be added to this history. + * + * @param [in] writer_guid GUID of the writer where the change came from. + * @param [in] total_payload_size Total payload size of the incoming change. + * @param [in] unknown_missing_changes_up_to The number of changes from the same writer with a lower sequence + * number that could potentially be received in the future. + * @param [out] will_never_be_accepted When the method returns @c false, this parameter will inform + * whether the change could be accepted in the future or not. + * + * @pre change should not be present in the history + * + * @return Whether a call to received_change will succeed when called with the same arguments. + */ + bool can_change_be_added_nts( + const GUID_t& writer_guid, + uint32_t total_payload_size, + size_t unknown_missing_changes_up_to, + bool& will_never_be_accepted) const override; + + /** + * Called when a change is received by the Subscriber. Will add the change to the history. + * @pre Change should not be already present in the history. + * @param[in] change The received change + * @param unknown_missing_changes_up_to Number of missing changes before this one + * @return + */ + bool received_change( + CacheChange_t* change, + size_t unknown_missing_changes_up_to) override; + + /** + * Called when a fragmented change is received completely by the Subscriber. Will find its instance and store it. + * @pre Change should be already present in the history. + * @param[in] change The received change + * @return + */ + bool completed_change( + CacheChange_t* change) override; + + /** + * @brief Returns information about the first untaken sample. + * @param [out] info SampleInfo structure to store first untaken sample information. + * @return true if sample info was returned. false if there is no sample to take. + */ + bool get_first_untaken_info( + SampleInfo& info); + + /** + * This method is called to remove a change from the SubscriberHistory. + * @param change Pointer to the CacheChange_t. + * @return True if removed. + */ + bool remove_change_sub( + CacheChange_t* change); + + /** + * This method is called to remove a change from the SubscriberHistory. + * @param [in] change Pointer to the CacheChange_t. + * @param [in,out] it Iterator pointing to change on input. Will point to next valid change on output. + * @return True if removed. + */ + bool remove_change_sub( + CacheChange_t* change, + DataReaderInstance::ChangeCollection::iterator& it); + + /** + * Called when a writer is unmatched from the reader holding this history. + * + * This method will remove all the changes on the history that came from the writer being unmatched and which have + * not yet been notified to the user. + * + * @param writer_guid GUID of the writer being unmatched. + * @param last_notified_seq Last sequence number from the specified writer that was notified to the user. + */ + void writer_unmatched( + const GUID_t& writer_guid, + const SequenceNumber_t& last_notified_seq) override; + + /** + * @brief A method to set the next deadline for the given instance + * @param handle The handle to the instance + * @param next_deadline_us The time point when the deadline will occur + * @return True if the deadline was set correctly + */ + bool set_next_deadline( + const InstanceHandle_t& handle, + const std::chrono::steady_clock::time_point& next_deadline_us); + + /** + * @brief A method to get the next instance handle that will miss the deadline and the time when the deadline will occur + * @param handle The handle to the instance + * @param next_deadline_us The time point when the instance will miss the deadline + * @return True if the deadline was retrieved successfully + */ + bool get_next_deadline( + InstanceHandle_t& handle, + std::chrono::steady_clock::time_point& next_deadline_us); + + /** + * @brief Get the list of changes corresponding to an instance handle. + * @param handle The handle to the instance. + * @param exact Indicates if the handle should match exactly (true) or if the first instance greater than the + * input handle should be returned. + * @return A pair where: + * - @c first is a boolean indicating if an instance was found + * - @c second is a pair where: + * - @c first is the handle of the returned instance + * - @c second is a pointer to a DataReaderInstance that holds information about the returned instance + * + * @remarks When used on a NO_KEY topic, an instance will only be returned when called with + * `handle = HANDLE_NIL` and `exact = false`. + */ + std::pair lookup_instance( + const InstanceHandle_t& handle, + bool exact); + + void update_instance_nts( + CacheChange_t* const change); + + void writer_not_alive( + const fastrtps::rtps::GUID_t& writer_guid); + + void check_and_remove_instance( + instance_info& instance_info); + +private: + + using InstanceCollection = std::map; + + //!Resource limits for allocating the array of changes per instance + eprosima::fastrtps::ResourceLimitedContainerConfig key_changes_allocation_; + //!Resource limits for allocating the array of alive writers per instance + eprosima::fastrtps::ResourceLimitedContainerConfig key_writers_allocation_; + //!Map where keys are instance handles and values vectors of cache changes + InstanceCollection keyed_changes_; + //!HistoryQosPolicy values. + HistoryQosPolicy history_qos_; + //!ResourceLimitsQosPolicy values. + ResourceLimitsQosPolicy resource_limited_qos_; + //!Topic name + fastrtps::string_255 topic_name_; + //!Type name + fastrtps::string_255 type_name_; + //!Whether the type has keys + bool has_keys_; + //!TopicDataType + fastdds::dds::TopicDataType* type_; + + //!Type object to deserialize Key + void* get_key_object_; + + /// Function processing a received change + std::function receive_fn_; + /// Function to compute the instance handle of a received change + std::function compute_key_for_change_fn_; + /// Function processing a completed fragmented change + std::function complete_fn_; + + /** + * @brief Method that finds a key in m_keyedChanges or tries to add it if not found + * @param a_change The change to get the key from + * @param map_it A map iterator to the given key + * @return True if it was found or could be added to the map + */ + bool find_key( + const InstanceHandle_t& handle, + InstanceCollection::iterator& map_it); + + /** + * @name Variants of incoming change processing. + * Will be called with the history mutex taken. + * @param[in] change The received change + * @param unknown_missing_changes_up_to Number of missing changes before this one + * @return + */ + ///@{ + bool received_change_keep_all( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_last( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + ///@} + + /** + * @name Variants of change reconstruction completion processing. + * Will be called with the history mutex taken. + * @param change The change for which the last missing fragment has been processed. + * @param instance Instance where the change should be added. + * @return true when the change was added to the instance. + * @return false when the change could not be added to the instance and has been removed from the history. + */ + ///@{ + bool completed_change_keep_all( + CacheChange_t* change, + DataReaderInstance& instance); + + bool completed_change_keep_last( + CacheChange_t* change, + DataReaderInstance& instance); + ///@} + + bool add_received_change_with_key( + CacheChange_t* a_change, + DataReaderInstance& instance); + + bool add_to_reader_history_if_not_full( + CacheChange_t* a_change); + + void add_to_instance( + CacheChange_t* a_change, + DataReaderInstance& instance); + +}; + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_ diff --git a/src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp b/src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp new file mode 100644 index 00000000000..ed5a4e267ac --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp @@ -0,0 +1,216 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderInstance.hpp + */ + +#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERINSTANCE_HPP_ +#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERINSTANCE_HPP_ + +#include +#include + +#include +#include + +#include + +#include "DataReaderCacheChange.hpp" + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +/// Book-keeping information for an instance +struct DataReaderInstance +{ + using ChangeCollection = eprosima::fastrtps::ResourceLimitedVector; + using WriterOwnership = std::pair; + using WriterCollection = eprosima::fastrtps::ResourceLimitedVector; + + //! A vector of DataReader changes belonging to the same instance + ChangeCollection cache_changes; + //! The list of alive writers for this instance + WriterCollection alive_writers; + //! GUID and strength of the current maximum strength writer + WriterOwnership current_owner{ {}, 0 }; + //! The time when the group will miss the deadline + std::chrono::steady_clock::time_point next_deadline_us; + //! Current view state of the instance + ViewStateKind view_state = ViewStateKind::NEW_VIEW_STATE; + //! Current instance state of the instance + InstanceStateKind instance_state = InstanceStateKind::ALIVE_INSTANCE_STATE; + //! Current disposed generation of the instance + int32_t disposed_generation_count = 0; + //! Current no_writers generation of the instance + int32_t no_writers_generation_count = 0; + + DataReaderInstance( + const eprosima::fastrtps::ResourceLimitedContainerConfig& changes_allocation, + const eprosima::fastrtps::ResourceLimitedContainerConfig& writers_allocation) + : cache_changes(changes_allocation) + , alive_writers(writers_allocation) + { + } + + bool update_state( + const fastrtps::rtps::ChangeKind_t change_kind, + const fastrtps::rtps::GUID_t& writer_guid, + const uint32_t ownership_strength = 0) + { + bool ret_val = false; + + switch (change_kind) + { + case fastrtps::rtps::ALIVE: + ret_val = writer_alive(writer_guid, ownership_strength); + break; + + case fastrtps::rtps::NOT_ALIVE_DISPOSED: + ret_val = writer_dispose(writer_guid, ownership_strength); + break; + + case fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED: + ret_val = writer_dispose(writer_guid, ownership_strength); + ret_val |= writer_unregister(writer_guid); + break; + + case fastrtps::rtps::NOT_ALIVE_UNREGISTERED: + ret_val = writer_unregister(writer_guid); + break; + + default: + // TODO (Miguel C): log error / assert + break; + } + + return ret_val; + } + + bool writer_removed( + const fastrtps::rtps::GUID_t& writer_guid) + { + return writer_unregister(writer_guid); + } + +private: + + bool writer_alive( + const fastrtps::rtps::GUID_t& writer_guid, + const uint32_t ownership_strength) + { + bool ret_val = false; + + if (ownership_strength >= current_owner.second) + { + current_owner.first = writer_guid; + current_owner.second = ownership_strength; + + if (InstanceStateKind::NOT_ALIVE_DISPOSED_INSTANCE_STATE == instance_state) + { + ++disposed_generation_count; + alive_writers.clear(); + view_state = ViewStateKind::NEW_VIEW_STATE; + ret_val = true; + } + else if (InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE == instance_state) + { + ++no_writers_generation_count; + alive_writers.clear(); + view_state = ViewStateKind::NEW_VIEW_STATE; + ret_val = true; + } + + writer_set(writer_guid, ownership_strength); + instance_state = InstanceStateKind::ALIVE_INSTANCE_STATE; + } + + return ret_val; + } + + bool writer_dispose( + const fastrtps::rtps::GUID_t& writer_guid, + const uint32_t ownership_strength) + { + bool ret_val = false; + + writer_set(writer_guid, ownership_strength); + if (ownership_strength >= current_owner.second) + { + current_owner.first = writer_guid; + current_owner.second = ownership_strength; + + if (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state) + { + ret_val = true; + instance_state = InstanceStateKind::NOT_ALIVE_DISPOSED_INSTANCE_STATE; + } + } + + return ret_val; + } + + bool writer_unregister( + const fastrtps::rtps::GUID_t& writer_guid) + { + bool ret_val = false; + + alive_writers.remove_if([&writer_guid](const WriterOwnership& item) + { + return item.first == writer_guid; + }); + + if (writer_guid == current_owner.first) + { + current_owner.second = 0; + current_owner.first = fastrtps::rtps::c_Guid_Unknown; + } + + if (alive_writers.empty() && (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state)) + { + ret_val = true; + instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE; + } + + return ret_val; + } + + void writer_set( + const fastrtps::rtps::GUID_t& writer_guid, + const uint32_t ownership_strength) + { + auto it = std::find_if(alive_writers.begin(), alive_writers.end(), [&writer_guid](const WriterOwnership& item) + { + return item.first == writer_guid; + }); + if (it == alive_writers.end()) + { + alive_writers.emplace_back(writer_guid, ownership_strength); + } + else + { + it->second = ownership_strength; + } + } + +}; + +} /* namespace detail */ +} /* namespace dds */ +} /* namespace fastdds */ +} /* namespace eprosima */ + +#endif // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_ diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp index 9de26f80dd5..39a7b4542d6 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp @@ -693,58 +693,6 @@ bool SubscriberHistory::get_next_deadline( return false; } -std::pair SubscriberHistory::lookup_instance( - const InstanceHandle_t& handle, - bool exact) -{ - if (topic_att_.getTopicKind() == NO_KEY) - { - if (handle.isDefined()) - { - // NO_KEY topics can only return the ficticious instance. - // Execution can only get here for two reasons: - // - Looking for a specific instance (exact = true) - // - Looking for the next instance to the ficticious one (exact = false) - // In both cases, no instance should be returned - return { false, {InstanceHandle_t(), nullptr} }; - } - else - { - if (exact) - { - // Looking for HANDLE_NIL, nothing to return - return { false, {InstanceHandle_t(), nullptr} }; - } - - // Looking for the first instance, return the ficticious one containing all changes - InstanceHandle_t tmp; - tmp.value[0] = 1; - return { true, {tmp, &m_changes} }; - } - } - - t_m_Inst_Caches::iterator it; - - if (exact) - { - it = keyed_changes_.find(handle); - } - else - { - auto comp = [](const InstanceHandle_t& h, const std::pair& it) - { - return h < it.first; - }; - it = std::upper_bound(keyed_changes_.begin(), keyed_changes_.end(), handle, comp); - } - - if (it != keyed_changes_.end()) - { - return { true, {it->first, &(it->second.cache_changes)} }; - } - return { false, {InstanceHandle_t(), nullptr} }; -} - ReaderHistory::iterator SubscriberHistory::remove_change_nts( ReaderHistory::const_iterator removal, bool release) diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index 6c4050d8e4a..a7baba15b44 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -158,38 +158,33 @@ History::iterator ReaderHistory::remove_change_nts( return ret_val; } +void ReaderHistory::writer_unmatched( + const GUID_t& writer_guid, + const SequenceNumber_t& last_notified_seq) +{ + static_cast(last_notified_seq); + remove_changes_with_pred( + [&writer_guid](CacheChange_t* ch) + { + return writer_guid == ch->writerGUID; + }); +} + bool ReaderHistory::remove_changes_with_guid( const GUID_t& a_guid) { - std::vector changes_to_remove; - if (mp_reader == nullptr || mp_mutex == nullptr) { logError(RTPS_READER_HISTORY, "You need to create a Reader with History before removing any changes"); return false; } - { - //Lock scope - std::lock_guard guard(*mp_mutex); - for (std::vector::iterator chit = m_changes.begin(); chit != m_changes.end(); ++chit) + remove_changes_with_pred( + [a_guid](CacheChange_t* ch) { - if ((*chit)->writerGUID == a_guid) - { - changes_to_remove.push_back((*chit)); - } - } - }//End lock scope + return a_guid == ch->writerGUID; + }); - for (std::vector::iterator chit = changes_to_remove.begin(); chit != changes_to_remove.end(); - ++chit) - { - if (!remove_change(*chit)) - { - logError(RTPS_READER_HISTORY, "One of the cachechanged in the GUID removal bulk could not be removed"); - return false; - } - } return true; } diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 3cb0c7939d9..74117e69ea2 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -285,7 +285,7 @@ bool StatefulReader::matched_writer_remove( if (is_alive_) { //Remove cachechanges belonging to the unmatched writer - mp_history->remove_changes_with_guid(writer_guid); + mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid)); if (liveliness_lease_duration_ < c_TimeInfinite) { @@ -802,34 +802,23 @@ bool StatefulReader::change_removed_by_history( { std::lock_guard guard(mp_mutex); - if (is_alive_) + if (is_alive_ && a_change->is_fully_assembled()) { if (wp != nullptr || matched_writer_lookup(a_change->writerGUID, &wp)) { - if (a_change->is_fully_assembled()) - { - if (!a_change->isRead && wp->available_changes_max() >= a_change->sequenceNumber) - { - if (0 < total_unread_) - { - --total_unread_; - } - } - - - wp->change_removed_from_history(a_change->sequenceNumber); - } - return true; + wp->change_removed_from_history(a_change->sequenceNumber); } - else + + if (!a_change->isRead && + get_last_notified(a_change->writerGUID) >= a_change->sequenceNumber) { - if (a_change->writerGUID.entityId != m_trustedWriterEntityId) + if (0 < total_unread_) { - // trusted entities messages mean no havoc - logError(RTPS_READER, - " You should always find the WP associated with a change, something is very wrong"); + --total_unread_; } + } + return true; } //Simulate a datasharing notification to process any pending payloads that were waiting due to full history @@ -1156,16 +1145,14 @@ bool StatefulReader::begin_sample_access_nts( const GUID_t& writer_guid = change->writerGUID; is_future_change = false; - if (!matched_writer_lookup(writer_guid, &wp)) - { - return false; - } - - SequenceNumber_t seq; - seq = wp->available_changes_max(); - if (seq < change->sequenceNumber) + if (matched_writer_lookup(writer_guid, &wp)) { - is_future_change = true; + SequenceNumber_t seq; + seq = wp->available_changes_max(); + if (seq < change->sequenceNumber) + { + is_future_change = true; + } } return true; @@ -1184,8 +1171,7 @@ void StatefulReader::change_read_by_user( WriterProxy* writer, bool mark_as_read) { - assert(writer != nullptr); - assert(change->writerGUID == writer->guid()); + assert(!writer || change->writerGUID == writer->guid()); // Mark change as read if (mark_as_read && !change->isRead) @@ -1198,7 +1184,7 @@ void StatefulReader::change_read_by_user( } // If not datasharing, we are done - if (!writer->is_datasharing_writer()) + if (!writer || !writer->is_datasharing_writer()) { return; } diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 0d67b0dfe78..b65c073f409 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -178,7 +178,7 @@ bool StatelessReader::matched_writer_remove( std::lock_guard guard(mp_mutex); //Remove cachechanges belonging to the unmatched writer - mp_history->remove_changes_with_guid(writer_guid); + mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid)); if (liveliness_lease_duration_ < c_TimeInfinite) { diff --git a/src/cpp/utils/SystemInfo.cpp b/src/cpp/utils/SystemInfo.cpp index fb98234b3a1..a860540a96d 100644 --- a/src/cpp/utils/SystemInfo.cpp +++ b/src/cpp/utils/SystemInfo.cpp @@ -161,10 +161,11 @@ FileWatchHandle SystemInfo::watch_file( break; } })); -#endif // defined(_WIN32) || defined(__unix__) +#else // defined(_WIN32) || defined(__unix__) static_cast(filename); static_cast(callback); return FileWatchHandle(); +#endif // defined(_WIN32) || defined(__unix__) } void SystemInfo::stop_watching_file( diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index c2df5d695b5..0a27fbdbb40 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -1571,8 +1571,9 @@ class PubSubReader std::unique_lock lock(mutex_); // Check order of changes. - ASSERT_LT(last_seq[info.instance_handle], info.sample_identity.sequence_number()); - last_seq[info.instance_handle] = info.sample_identity.sequence_number(); + LastSeqInfo seq_info{ info.instance_handle, info.sample_identity.writer_guid() }; + ASSERT_LT(last_seq[seq_info], info.sample_identity.sequence_number()); + last_seq[seq_info] = info.sample_identity.sequence_number(); if (info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE) { @@ -1657,7 +1658,8 @@ class PubSubReader unsigned int participant_matched_; std::atomic receiving_; eprosima::fastdds::dds::TypeSupport type_; - std::map last_seq; + using LastSeqInfo = std::pair; + std::map last_seq; size_t current_processed_count_; size_t number_samples_expected_; bool discovery_result_; diff --git a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h index 932e4c58de9..840a9e1bce1 100644 --- a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h +++ b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h @@ -133,10 +133,22 @@ class ReaderHistory return m_changes.erase(removal); } + virtual void writer_unmatched( + const GUID_t& /*writer_guid*/, + const SequenceNumber_t& /*last_notified_seq*/) + { + } + HistoryAttributes m_att; protected: + template + inline void remove_changes_with_pred( + Pred) + { + } + RTPSReader* mp_reader; RecursiveTimedMutex* mp_mutex; std::vector m_changes; diff --git a/test/unittest/dds/publisher/CMakeLists.txt b/test/unittest/dds/publisher/CMakeLists.txt index 17fcb776640..3b5305eb46f 100644 --- a/test/unittest/dds/publisher/CMakeLists.txt +++ b/test/unittest/dds/publisher/CMakeLists.txt @@ -75,6 +75,7 @@ set(DATAWRITERTESTS_SOURCE DataWriterTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/Subscriber.cpp diff --git a/test/unittest/dds/status/CMakeLists.txt b/test/unittest/dds/status/CMakeLists.txt index e68d3c33ee0..ae5a52d868f 100644 --- a/test/unittest/dds/status/CMakeLists.txt +++ b/test/unittest/dds/status/CMakeLists.txt @@ -36,6 +36,7 @@ set(LISTENERTESTS_SOURCE ListenerTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/SubscriberImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/SubscriberQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index 12e029e7299..a1c76f15a5c 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -1550,6 +1550,311 @@ TEST_F(DataReaderTests, read_unread) } } +TEST_F(DataReaderTests, sample_info) +{ + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS; + reader_qos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS; + reader_qos.history().kind = KEEP_LAST_HISTORY_QOS; + reader_qos.history().depth = 1; + reader_qos.resource_limits().max_instances = 2; + reader_qos.resource_limits().max_samples_per_instance = 1; + reader_qos.resource_limits().max_samples = 2; + + create_entities(nullptr, reader_qos); + publisher_->delete_datawriter(data_writer_); + data_writer_ = nullptr; + + struct TestCmd + { + enum Operation + { + WRITE, UNREGISTER, DISPOSE, CLOSE + }; + + size_t writer_index; + Operation operation; + size_t instance_index; + }; + + struct TestInstanceResult + { + ReturnCode_t ret_code; + ViewStateKind view_state; + InstanceStateKind instance_state; + int32_t disposed_generation_count; + int32_t no_writers_generation_count; + }; + + struct TestStep + { + std::vector operations; + TestInstanceResult instance_state[2]; + }; + + struct TestState + { + TestState( + TypeSupport& type, + Topic* topic, + Publisher* publisher) + : topic_(topic) + , publisher_(publisher) + { + writer_qos_ = DATAWRITER_QOS_DEFAULT; + writer_qos_.publish_mode().kind = SYNCHRONOUS_PUBLISH_MODE; + writer_qos_.reliability().kind = RELIABLE_RELIABILITY_QOS; + writer_qos_.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS; + writer_qos_.history().kind = KEEP_LAST_HISTORY_QOS; + writer_qos_.history().depth = 1; + writer_qos_.resource_limits().max_instances = 2; + writer_qos_.resource_limits().max_samples_per_instance = 1; + writer_qos_.resource_limits().max_samples = 2; + writer_qos_.writer_data_lifecycle().autodispose_unregistered_instances = false; + + data_[0].index(1); + data_[1].index(2); + + type.get_key(&data_[0], &handles_[0]); + type.get_key(&data_[1], &handles_[1]); + } + + ~TestState() + { + close_writer(0); + close_writer(1); + } + + void run_test( + DataReader* reader, + const std::vector& steps) + { + for (const TestStep& step : steps) + { + for (const TestCmd& cmd : step.operations) + { + execute(cmd); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + check(reader, 0, step.instance_state[0]); + check(reader, 1, step.instance_state[1]); + } + + } + + void execute( + const TestCmd& cmd) + { + DataWriter* writer = nullptr; + ReturnCode_t ret_code; + + switch (cmd.operation) + { + case TestCmd::CLOSE: + close_writer(cmd.writer_index); + break; + + case TestCmd::DISPOSE: + writer = open_writer(cmd.writer_index); + ret_code = writer->dispose(&data_[cmd.instance_index], handles_[cmd.instance_index]); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret_code); + break; + + case TestCmd::UNREGISTER: + writer = open_writer(cmd.writer_index); + ret_code = writer->unregister_instance(&data_[cmd.instance_index], handles_[cmd.instance_index]); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret_code); + break; + + case TestCmd::WRITE: + writer = open_writer(cmd.writer_index); + ret_code = writer->write(&data_[cmd.instance_index], handles_[cmd.instance_index]); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret_code); + break; + } + } + + void check( + DataReader* reader, + size_t instance_index, + const TestInstanceResult& instance_result) + { + FooSeq values; + SampleInfoSeq infos; + ReturnCode_t ret_code; + + ret_code = reader->read_instance(values, infos, LENGTH_UNLIMITED, handles_[instance_index]); + EXPECT_EQ(ret_code, instance_result.ret_code); + if (ReturnCode_t::RETCODE_OK == ret_code) + { + EXPECT_EQ(instance_result.instance_state, infos[0].instance_state); + EXPECT_EQ(instance_result.view_state, infos[0].view_state); + EXPECT_EQ(instance_result.disposed_generation_count, infos[0].disposed_generation_count); + EXPECT_EQ(instance_result.no_writers_generation_count, infos[0].no_writers_generation_count); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, reader->return_loan(values, infos)); + } + } + + private: + + Topic* topic_; + Publisher* publisher_; + DataWriterQos writer_qos_; + DataWriter* writers_[2] = { nullptr, nullptr }; + + InstanceHandle_t handles_[2]; + FooType data_[2]; + + void close_writer( + size_t index) + { + DataWriter*& writer = writers_[index]; + if (writer != nullptr) + { + publisher_->delete_datawriter(writer); + writer = nullptr; + } + } + + DataWriter* open_writer( + size_t index) + { + DataWriter*& writer = writers_[index]; + if (writer == nullptr) + { + writer = publisher_->create_datawriter(topic_, writer_qos_); + } + return writer; + } + + }; + + static const std::vector steps = + { + { + // Instances have never been written + {}, + { + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // One writer writes on first instance => that instance should be NEW and ALIVE + { {0, TestCmd::WRITE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // Same writer writes on first instance => instance becomes NOT_NEW + { {0, TestCmd::WRITE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // Same writer disposes first instance => instance becomes NOT_ALIVE_DISPOSED + { {0, TestCmd::DISPOSE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 0}, + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // First writer writes second instance => NEW and ALIVE + // Second writer writes first instance => NEW and ALIVE + { {0, TestCmd::WRITE, 1}, {1, TestCmd::WRITE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 0}, + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // Both writers write on second instance => NOT_NEW and ALIVE + { {0, TestCmd::WRITE, 1}, {1, TestCmd::WRITE, 1} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 0}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // Second writer closes => first instance becomes NOT_ALIVE_NO_WRITERS + { {1, TestCmd::CLOSE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 1, 0}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // First writer unregisters second instance => NOT_ALIVE_NO_WRITERS + { {0, TestCmd::UNREGISTER, 1} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 1, 0}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 0, 0}, + } + }, + { + // Both writers write both instances + { {0, TestCmd::WRITE, 0}, {1, TestCmd::WRITE, 0}, {0, TestCmd::WRITE, 1}, {1, TestCmd::WRITE, 1} }, + { + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 1}, + } + }, + { + // Reading twice should return NOT_NEW + {}, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 1}, + } + }, + { + // 0 - Unregistering while having another alive writer should not change state + // 1 - Disposing while having another alive writer is always done + { {0, TestCmd::UNREGISTER, 0}, {1, TestCmd::DISPOSE, 1} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 1}, + } + }, + { + // 0 - Writing and unregistering while having another alive writer should not change state + // 1 - Unregister a disposed instance should not change state + { {0, TestCmd::WRITE, 0}, {0, TestCmd::UNREGISTER, 1}, {1, TestCmd::UNREGISTER, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 1}, + } + }, + { + // 0 - Closing both writers should return NOT_ALIVE_NO_WRITERS + // 1 - Closing both writers on a disposed instance should not change state + { {0, TestCmd::CLOSE, 0}, {1, TestCmd::CLOSE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 1}, + } + }, + }; + + // Run test once + TestState state(type_, topic_, publisher_); + state.run_test(data_reader_, steps); + + // Taking all data should remove instance information + FooSeq data; + SampleInfoSeq infos; + EXPECT_EQ(ReturnCode_t::RETCODE_OK, data_reader_->take(data, infos)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, data_reader_->return_loan(data, infos)); + + // Run test again + state.run_test(data_reader_, steps); +} + /* * This type fails deserialization on odd samples */ @@ -1883,13 +2188,17 @@ TEST_F(DataReaderTests, check_key_history_wholesomeness_on_unmatch) SampleInfoSeq infos; res = data_reader_->take_instance(samples, infos, LENGTH_UNLIMITED, handle_ok_); + + // If the DataWriter is destroyed only the non-notified samples must be removed + // this operation MUST succeed + ASSERT_EQ(res, ReturnCode_t::RETCODE_OK); + + data_reader_->return_loan(samples, infos); }); // Check if the thread hangs // wait for termination std::this_thread::sleep_for(std::chrono::milliseconds(500)); - // check expected result, if query thread hangs res = ReturnCode_t::RETCODE_OK - ASSERT_NE(res, ReturnCode_t::RETCODE_OK); query.join(); } diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt index 1eca0b887c0..4ec673f91d4 100644 --- a/test/unittest/statistics/dds/CMakeLists.txt +++ b/test/unittest/statistics/dds/CMakeLists.txt @@ -129,6 +129,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS) ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/Subscriber.cpp diff --git a/test/unittest/utils/ResourceLimitedVectorTests.cpp b/test/unittest/utils/ResourceLimitedVectorTests.cpp index 60bf89b7eb5..98c2a581bad 100644 --- a/test/unittest/utils/ResourceLimitedVectorTests.cpp +++ b/test/unittest/utils/ResourceLimitedVectorTests.cpp @@ -21,14 +21,15 @@ using namespace eprosima::fastrtps; // some implementations of std::vector would enforce power of two capacities constexpr size_t NUM_ITEMS = 32; -class ResourceLimitedVectorTests: public ::testing::Test +class ResourceLimitedVectorTests : public ::testing::Test { - public: - const int testbed[NUM_ITEMS] = - { - 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, - 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32 - }; +public: + + const int testbed[NUM_ITEMS] = + { + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, + 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32 + }; }; TEST_F(ResourceLimitedVectorTests, default_constructor) @@ -59,6 +60,26 @@ TEST_F(ResourceLimitedVectorTests, default_constructor) // Should be empty ASSERT_TRUE(uut.empty()); ASSERT_EQ(uut.capacity(), NUM_ITEMS); + + // Add all items using insert + auto it = uut.cbegin(); + for (int i : testbed) + { + it = uut.insert(it, i); + ASSERT_NE(it, uut.cend()); + it = uut.insert(it, std::move(i)); + ASSERT_NE(it, uut.cend()); + } + + // Vector should be filled + ASSERT_EQ(uut.size(), 2 * NUM_ITEMS); + ASSERT_EQ(uut.capacity(), 2 * NUM_ITEMS); + + uut.clear(); + + // Should be empty but allocated + ASSERT_TRUE(uut.empty()); + ASSERT_EQ(uut.capacity(), 2 * NUM_ITEMS); } TEST_F(ResourceLimitedVectorTests, static_config) @@ -103,7 +124,7 @@ TEST_F(ResourceLimitedVectorTests, static_config) TEST_F(ResourceLimitedVectorTests, prealocated_growing_1_config) { - ResourceLimitedVector uut(ResourceLimitedContainerConfig{ NUM_ITEMS, NUM_ITEMS*2, 1}); + ResourceLimitedVector uut(ResourceLimitedContainerConfig{ NUM_ITEMS, NUM_ITEMS * 2, 1}); ASSERT_TRUE(uut.empty()); @@ -130,8 +151,8 @@ TEST_F(ResourceLimitedVectorTests, prealocated_growing_1_config) } // Vector should be filled - ASSERT_EQ(uut.size(), NUM_ITEMS*2); - ASSERT_EQ(uut.capacity(), NUM_ITEMS*2); + ASSERT_EQ(uut.size(), NUM_ITEMS * 2); + ASSERT_EQ(uut.capacity(), NUM_ITEMS * 2); // Adding more values should return error for (int i : testbed) @@ -227,7 +248,10 @@ TEST_F(ResourceLimitedVectorTests, remove_if) ASSERT_EQ(uut.size(), NUM_ITEMS); ASSERT_EQ(uut.capacity(), NUM_ITEMS); - auto is_odd = [](int i) { return (i & 1) != 0; }; + auto is_odd = [](int i) + { + return (i & 1) != 0; + }; // Remove all odd items and check no errors for (size_t i = 0; i < NUM_ITEMS / 2; i++) @@ -247,7 +271,9 @@ TEST_F(ResourceLimitedVectorTests, remove_if) } -int main(int argc, char **argv) +int main( + int argc, + char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/versions.md b/versions.md index 98da12cffff..b9bec8a1995 100644 --- a/versions.md +++ b/versions.md @@ -7,6 +7,8 @@ Forthcoming API, implies ABI break) * New DataWriter API allowing to wait for acknowledgements for a specific instance (extends DataWriter API, implies ABI break) +* Generation of GUID on entity creation (ABI break on RTPS layer) +* Adding DataReader history with correct implementation of instance_state and view_state (ABI break on RTPS layer) * Version 2.4.0