From 4ca60d596c7d3cdf7c360ae8281b4ed48172c045 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 7 Sep 2021 07:20:07 +0200 Subject: [PATCH 01/10] Refactor DataReaderImpl to use a new DataReaderHistory class (#2177) * Refs 12400. Duplicating SubscriberHistory into DataReaderHistory. Signed-off-by: Miguel Company * Refs 12400. Using DataReaderHistory on DataReaderImpl. Signed-off-by: Miguel Company * Refs 12400. Avoid using TopicAttributes. Signed-off-by: Miguel Company * Refs 12400. Additional cleanup. Signed-off-by: Miguel Company * Refs 12400. Using DDS SampleInfo. Signed-off-by: Miguel Company * Refs 12404. Uncrustify. Signed-off-by: Miguel Company --- src/cpp/CMakeLists.txt | 1 + src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 44 +- src/cpp/fastdds/subscriber/DataReaderImpl.hpp | 5 +- .../DataReaderImpl/ReadTakeCommand.hpp | 4 +- .../subscriber/history/DataReaderHistory.cpp | 626 ++++++++++++++++++ .../subscriber/history/DataReaderHistory.hpp | 249 +++++++ test/unittest/dds/status/CMakeLists.txt | 1 + test/unittest/statistics/dds/CMakeLists.txt | 1 + 8 files changed, 885 insertions(+), 46 deletions(-) create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index 16d4f1c8e56..d4842b96978 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -86,6 +86,7 @@ set(${PROJECT_NAME}_source_files fastrtps_deprecated/subscriber/SubscriberImpl.cpp fastrtps_deprecated/subscriber/SubscriberHistory.cpp fastdds/subscriber/DataReader.cpp + fastdds/subscriber/history/DataReaderHistory.cpp fastdds/publisher/DataWriter.cpp fastdds/subscriber/DataReaderImpl.cpp fastdds/publisher/DataWriterImpl.cpp diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 302d2ff798a..5bcd486ca25 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -57,40 +57,6 @@ namespace eprosima { namespace fastdds { namespace dds { -static void sample_info_to_dds ( - const SampleInfo_t& rtps_info, - SampleInfo* dds_info) -{ - dds_info->sample_state = NOT_READ_SAMPLE_STATE; - dds_info->view_state = NOT_NEW_VIEW_STATE; - dds_info->disposed_generation_count = 0; - dds_info->no_writers_generation_count = 1; - dds_info->sample_rank = 0; - dds_info->generation_rank = 0; - dds_info->absoulte_generation_rank = 0; - dds_info->source_timestamp = rtps_info.sourceTimestamp; - dds_info->reception_timestamp = rtps_info.receptionTimestamp; - dds_info->instance_handle = rtps_info.iHandle; - dds_info->publication_handle = fastrtps::rtps::InstanceHandle_t(rtps_info.sample_identity.writer_guid()); - dds_info->sample_identity = rtps_info.sample_identity; - dds_info->related_sample_identity = rtps_info.related_sample_identity; - dds_info->valid_data = rtps_info.sampleKind == eprosima::fastrtps::rtps::ALIVE ? true : false; - - switch (rtps_info.sampleKind) - { - case eprosima::fastrtps::rtps::ALIVE: - dds_info->instance_state = ALIVE_INSTANCE_STATE; - break; - case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: - dds_info->instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE; - break; - default: - //TODO [ILG] change this if the other kinds ever get implemented - dds_info->instance_state = ALIVE_INSTANCE_STATE; - break; - } -} - static bool collections_have_same_properties( const LoanableCollection& data_values, const SampleInfoSeq& sample_infos) @@ -126,11 +92,7 @@ DataReaderImpl::DataReaderImpl( , topic_(topic) , qos_(&qos == &DATAREADER_QOS_DEFAULT ? subscriber_->get_default_datareader_qos() : qos) #pragma warning (disable : 4355 ) - , history_(topic_attributes(), - type_.get(), - qos_.get_readerqos(subscriber_->get_qos()), - type_->m_typeSize + 3, /* Possible alignment */ - qos_.endpoint().history_memory_policy) + , history_(type, *topic, qos_) , listener_(listener) , reader_listener_(this) , deadline_duration_us_(qos_.deadline().period.to_ns() * 1e-3) @@ -703,10 +665,8 @@ ReturnCode_t DataReaderImpl::get_first_untaken_info( return ReturnCode_t::RETCODE_NOT_ENABLED; } - SampleInfo_t rtps_info; - if (history_.get_first_untaken_info(&rtps_info)) + if (history_.get_first_untaken_info(*info)) { - sample_info_to_dds(rtps_info, info); return ReturnCode_t::RETCODE_OK; } return ReturnCode_t::RETCODE_NO_DATA; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index 2a506d661f9..5f84e097467 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -36,7 +36,6 @@ #include #include -#include #include #include @@ -46,6 +45,8 @@ #include #include +#include + using eprosima::fastrtps::types::ReturnCode_t; namespace eprosima { @@ -331,7 +332,7 @@ class DataReaderImpl DataReaderQos qos_; //!History - fastrtps::SubscriberHistory history_; + detail::DataReaderHistory history_; //!Listener DataReaderListener* listener_ = nullptr; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp index dbd4a79b6ea..54542d7f5ac 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp @@ -27,7 +27,6 @@ #include #include -#include #include #include @@ -35,6 +34,7 @@ #include #include #include +#include #include #include @@ -51,7 +51,7 @@ namespace detail { struct ReadTakeCommand { using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t; - using history_type = eprosima::fastrtps::SubscriberHistory; + using history_type = eprosima::fastdds::dds::detail::DataReaderHistory; using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t; using RTPSReader = eprosima::fastrtps::rtps::RTPSReader; using WriterProxy = eprosima::fastrtps::rtps::WriterProxy; diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp new file mode 100644 index 00000000000..bc56f611891 --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -0,0 +1,626 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderHistory.cpp + */ + +#include +#include + +#include "DataReaderHistory.hpp" + +#include +#include + +#include +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +using namespace eprosima::fastrtps::rtps; + +using eprosima::fastrtps::KeyedChanges; +using eprosima::fastrtps::RecursiveTimedMutex; + +static void get_sample_info( + SampleInfo& info, + CacheChange_t* change) +{ + info.sample_state = NOT_READ_SAMPLE_STATE; + info.view_state = NOT_NEW_VIEW_STATE; + info.disposed_generation_count = 0; + info.no_writers_generation_count = 1; + info.sample_rank = 0; + info.generation_rank = 0; + info.absoulte_generation_rank = 0; + info.source_timestamp = change->sourceTimestamp; + info.reception_timestamp = change->reader_info.receptionTimestamp; + info.instance_handle = change->instanceHandle; + info.publication_handle = fastrtps::rtps::InstanceHandle_t(change->writerGUID); + info.sample_identity.writer_guid(change->writerGUID); + info.sample_identity.sequence_number(change->sequenceNumber); + info.related_sample_identity = change->write_params.sample_identity(); + info.valid_data = change->kind == eprosima::fastrtps::rtps::ALIVE; + + switch (change->kind) + { + case eprosima::fastrtps::rtps::ALIVE: + info.instance_state = ALIVE_INSTANCE_STATE; + break; + case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: + info.instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE; + break; + default: + //TODO [ILG] change this if the other kinds ever get implemented + info.instance_state = ALIVE_INSTANCE_STATE; + break; + } +} + +static HistoryAttributes to_history_attributes( + const TypeSupport& type, + const DataReaderQos& qos) +{ + auto initial_samples = qos.resource_limits().allocated_samples; + auto max_samples = qos.resource_limits().max_samples; + + if (qos.history().kind != KEEP_ALL_HISTORY_QOS) + { + max_samples = qos.history().depth; + if (type->m_isGetKeyDefined) + { + max_samples *= qos.resource_limits().max_instances; + } + + initial_samples = std::min(initial_samples, max_samples); + } + + auto mempolicy = qos.endpoint().history_memory_policy; + auto payloadMaxSize = type->m_typeSize + 3; // possible alignment + + return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples); +} + +DataReaderHistory::DataReaderHistory( + const TypeSupport& type, + const TopicDescription& topic, + const DataReaderQos& qos) + : ReaderHistory(to_history_attributes(type, qos)) + , history_qos_(qos.history()) + , resource_limited_qos_(qos.resource_limits()) + , topic_name_(topic.get_name()) + , type_name_(topic.get_type_name()) + , has_keys_(type->m_isGetKeyDefined) + , type_(type.get()) + , get_key_object_(nullptr) +{ + if (type_->m_isGetKeyDefined) + { + get_key_object_ = type_->createData(); + } + + if (resource_limited_qos_.max_samples == 0) + { + resource_limited_qos_.max_samples = std::numeric_limits::max(); + } + + if (resource_limited_qos_.max_instances == 0) + { + resource_limited_qos_.max_instances = std::numeric_limits::max(); + } + + if (resource_limited_qos_.max_samples_per_instance == 0) + { + resource_limited_qos_.max_samples_per_instance = std::numeric_limits::max(); + } + + using std::placeholders::_1; + using std::placeholders::_2; + + if (!has_keys_) + { + receive_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? + std::bind(&DataReaderHistory::received_change_keep_all_no_key, this, _1, _2) : + std::bind(&DataReaderHistory::received_change_keep_last_no_key, this, _1, _2); + } + else + { + receive_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? + std::bind(&DataReaderHistory::received_change_keep_all_with_key, this, _1, _2) : + std::bind(&DataReaderHistory::received_change_keep_last_with_key, this, _1, _2); + } +} + +DataReaderHistory::~DataReaderHistory() +{ + if (type_->m_isGetKeyDefined) + { + type_->deleteData(get_key_object_); + } +} + +bool DataReaderHistory::received_change( + CacheChange_t* a_change, + size_t unknown_missing_changes_up_to) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + + std::lock_guard guard(*mp_mutex); + return receive_fn_(a_change, unknown_missing_changes_up_to); +} + +bool DataReaderHistory::received_change_keep_all_no_key( + CacheChange_t* a_change, + size_t unknown_missing_changes_up_to) +{ + // TODO(Ricardo) Check + if (m_changes.size() + unknown_missing_changes_up_to < static_cast(resource_limited_qos_.max_samples)) + { + return add_received_change(a_change); + } + + return false; +} + +bool DataReaderHistory::received_change_keep_last_no_key( + CacheChange_t* a_change, + size_t /* unknown_missing_changes_up_to */ ) +{ + bool add = false; + if (m_changes.size() < static_cast(history_qos_.depth)) + { + add = true; + } + else + { + // Try to substitute the oldest sample. + + // As the history should be ordered following the presentation QoS, we can always remove the first one. + add = remove_change_sub(m_changes.at(0)); + } + + if (add) + { + return add_received_change(a_change); + } + + return false; +} + +bool DataReaderHistory::received_change_keep_all_with_key( + CacheChange_t* a_change, + size_t /* unknown_missing_changes_up_to */ ) +{ + // TODO(Miguel C): Should we check unknown_missing_changes_up_to as it is done in received_change_keep_all_no_key? + + t_m_Inst_Caches::iterator vit; + if (find_key_for_change(a_change, vit)) + { + std::vector& instance_changes = vit->second.cache_changes; + if (instance_changes.size() < static_cast(resource_limited_qos_.max_samples_per_instance)) + { + return add_received_change_with_key(a_change, vit->second.cache_changes); + } + + logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance"); + } + + return false; +} + +bool DataReaderHistory::received_change_keep_last_with_key( + CacheChange_t* a_change, + size_t /* unknown_missing_changes_up_to */) +{ + t_m_Inst_Caches::iterator vit; + if (find_key_for_change(a_change, vit)) + { + bool add = false; + std::vector& instance_changes = vit->second.cache_changes; + if (instance_changes.size() < static_cast(history_qos_.depth)) + { + add = true; + } + else + { + // Try to substitute the oldest sample. + + // As the instance should be ordered following the presentation QoS, we can always remove the first one. + add = remove_change_sub(instance_changes.at(0)); + } + + if (add) + { + return add_received_change_with_key(a_change, instance_changes); + } + } + + return false; +} + +bool DataReaderHistory::add_received_change( + CacheChange_t* a_change) +{ + if (m_isHistoryFull) + { + // Discarding the sample. + logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << type_name_); + return false; + } + + if (add_change(a_change)) + { + if (m_changes.size() == static_cast(m_att.maximumReservedCaches)) + { + m_isHistoryFull = true; + } + + logInfo(SUBSCRIBER, type_name_ + << ": Change " << a_change->sequenceNumber << " added from: " + << a_change->writerGUID; ); + + return true; + } + + return false; +} + +bool DataReaderHistory::add_received_change_with_key( + CacheChange_t* a_change, + std::vector& instance_changes) +{ + if (m_isHistoryFull) + { + // Discarting the sample. + logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << type_name_); + return false; + } + + if (add_change(a_change)) + { + if (m_changes.size() == static_cast(m_att.maximumReservedCaches)) + { + m_isHistoryFull = true; + } + + //ADD TO KEY VECTOR + + // As the instance should be ordered following the presentation QoS, and + // we only support ordering by reception timestamp, we can always add at the end. + instance_changes.push_back(a_change); + + logInfo(SUBSCRIBER, mp_reader->getGuid().entityId + << ": Change " << a_change->sequenceNumber << " added from: " + << a_change->writerGUID << " with KEY: " << a_change->instanceHandle; ); + + return true; + } + + return false; +} + +bool DataReaderHistory::find_key_for_change( + CacheChange_t* a_change, + t_m_Inst_Caches::iterator& map_it) +{ + if (!a_change->instanceHandle.isDefined() && type_ != nullptr) + { + logInfo(SUBSCRIBER, "Getting Key of change with no Key transmitted"); + type_->deserialize(&a_change->serializedPayload, get_key_object_); + bool is_key_protected = false; +#if HAVE_SECURITY + is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected; +#endif // if HAVE_SECURITY + if (!type_->getKey(get_key_object_, &a_change->instanceHandle, is_key_protected)) + { + return false; + } + } + else if (!a_change->instanceHandle.isDefined()) + { + logWarning(SUBSCRIBER, "NO KEY in topic: " << topic_name_ + << " and no method to obtain it"; ); + return false; + } + + return find_key(a_change, &map_it); +} + +bool DataReaderHistory::get_first_untaken_info( + SampleInfo& info) +{ + std::lock_guard lock(*mp_mutex); + + CacheChange_t* change = nullptr; + WriterProxy* wp = nullptr; + if (mp_reader->nextUntakenCache(&change, &wp)) + { + get_sample_info(info, change); + mp_reader->change_read_by_user(change, wp, false); + return true; + } + + return false; +} + +bool DataReaderHistory::find_key( + CacheChange_t* a_change, + t_m_Inst_Caches::iterator* vit_out) +{ + t_m_Inst_Caches::iterator vit; + vit = keyed_changes_.find(a_change->instanceHandle); + if (vit != keyed_changes_.end()) + { + *vit_out = vit; + return true; + } + + if (keyed_changes_.size() < static_cast(resource_limited_qos_.max_instances)) + { + *vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first; + return true; + } + else + { + for (vit = keyed_changes_.begin(); vit != keyed_changes_.end(); ++vit) + { + if (vit->second.cache_changes.size() == 0) + { + keyed_changes_.erase(vit); + *vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first; + return true; + } + } + logWarning(SUBSCRIBER, "History has reached the maximum number of instances"); + } + return false; +} + +bool DataReaderHistory::remove_change_sub( + CacheChange_t* change) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + + std::lock_guard guard(*mp_mutex); + if (has_keys_) + { + bool found = false; + t_m_Inst_Caches::iterator vit; + if (find_key(change, &vit)) + { + for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) + { + if ((*chit)->sequenceNumber == change->sequenceNumber && (*chit)->writerGUID == change->writerGUID) + { + vit->second.cache_changes.erase(chit); + found = true; + break; + } + } + } + if (!found) + { + logError(SUBSCRIBER, "Change not found on this key, something is wrong"); + } + } + + if (remove_change(change)) + { + m_isHistoryFull = false; + return true; + } + + return false; +} + +bool DataReaderHistory::remove_change_sub( + CacheChange_t* change, + iterator& it) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + + std::lock_guard guard(*mp_mutex); + if (has_keys_) + { + bool found = false; + t_m_Inst_Caches::iterator vit; + if (find_key(change, &vit)) + { + for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) + { + if ((*chit)->sequenceNumber == change->sequenceNumber && (*chit)->writerGUID == change->writerGUID) + { + assert(it == chit); + it = vit->second.cache_changes.erase(chit); + found = true; + break; + } + } + } + if (!found) + { + logError(SUBSCRIBER, "Change not found on this key, something is wrong"); + } + } + + const_iterator chit = find_change_nts(change); + if (chit == changesEnd()) + { + logInfo(RTPS_WRITER_HISTORY, "Trying to remove a change not in history"); + return false; + } + + m_isHistoryFull = false; + iterator ret_it = remove_change_nts(chit); + + if (!has_keys_) + { + it = ret_it; + } + + return true; +} + +bool DataReaderHistory::set_next_deadline( + const InstanceHandle_t& handle, + const std::chrono::steady_clock::time_point& next_deadline_us) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + std::lock_guard guard(*mp_mutex); + + if (!has_keys_) + { + next_deadline_us_ = next_deadline_us; + return true; + } + + if (keyed_changes_.find(handle) == keyed_changes_.end()) + { + return false; + } + + keyed_changes_[handle].next_deadline_us = next_deadline_us; + return true; +} + +bool DataReaderHistory::get_next_deadline( + InstanceHandle_t& handle, + std::chrono::steady_clock::time_point& next_deadline_us) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + std::lock_guard guard(*mp_mutex); + + if (!has_keys_) + { + next_deadline_us = next_deadline_us_; + return true; + } + + auto min = std::min_element(keyed_changes_.begin(), + keyed_changes_.end(), + []( + const std::pair& lhs, + const std::pair& rhs) + { + return lhs.second.next_deadline_us < rhs.second.next_deadline_us; + }); + handle = min->first; + next_deadline_us = min->second.next_deadline_us; + return true; +} + +std::pair DataReaderHistory::lookup_instance( + const InstanceHandle_t& handle, + bool exact) +{ + if (!has_keys_) + { + if (handle.isDefined()) + { + // NO_KEY topics can only return the ficticious instance. + // Execution can only get here for two reasons: + // - Looking for a specific instance (exact = true) + // - Looking for the next instance to the ficticious one (exact = false) + // In both cases, no instance should be returned + return { false, {InstanceHandle_t(), nullptr} }; + } + + if (exact) + { + // Looking for HANDLE_NIL, nothing to return + return { false, {InstanceHandle_t(), nullptr} }; + } + + // Looking for the first instance, return the ficticious one containing all changes + InstanceHandle_t tmp; + tmp.value[0] = 1; + return { true, {tmp, &m_changes} }; + } + + t_m_Inst_Caches::iterator it; + + if (exact) + { + it = keyed_changes_.find(handle); + } + else + { + auto comp = [](const InstanceHandle_t& h, const std::pair& it) + { + return h < it.first; + }; + it = std::upper_bound(keyed_changes_.begin(), keyed_changes_.end(), handle, comp); + } + + if (it != keyed_changes_.end()) + { + return { true, {it->first, &(it->second.cache_changes)} }; + } + return { false, {InstanceHandle_t(), nullptr} }; +} + +ReaderHistory::iterator DataReaderHistory::remove_change_nts( + ReaderHistory::const_iterator removal, + bool release) +{ + CacheChange_t* p_sample = nullptr; + + if ( removal != changesEnd() + && (p_sample = *removal)->instanceHandle.isDefined() + && has_keys_) + { + // clean any references to this CacheChange in the key state collection + auto it = keyed_changes_.find(p_sample->instanceHandle); + + // if keyed and in history must be in the map + assert(it != keyed_changes_.end()); + + auto& c = it->second.cache_changes; + c.erase(std::remove(c.begin(), c.end(), p_sample), c.end()); + } + + // call the base class + return ReaderHistory::remove_change_nts(removal, release); +} + +} // namespace detail +} // namsepace dds +} // namespace fastdds +} // namsepace eprosima diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp new file mode 100644 index 00000000000..236ba1f767b --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp @@ -0,0 +1,249 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderHistory.hpp + */ + +#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_ +#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_ + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +/** + * Class DataReaderHistory, container of the different CacheChanges of a DataReader + */ +class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory +{ +public: + + using MemoryManagementPolicy_t = eprosima::fastrtps::rtps::MemoryManagementPolicy_t; + using InstanceHandle_t = eprosima::fastrtps::rtps::InstanceHandle_t; + using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t; + + using instance_info = std::pair*>; + + /** + * Constructor. Requires information about the subscriber. + * @param topic_att TopicAttributes. + * @param type TopicDataType. + * @param qos ReaderQoS policy. + * @param payloadMax Maximum payload size per change. + * @param mempolicy Set whether the payloads ccan dynamically resized or not. + */ + DataReaderHistory( + const TypeSupport& type, + const TopicDescription& topic, + const DataReaderQos& qos); + + ~DataReaderHistory() override; + + /** + * Remove a specific change from the history. + * No Thread Safe + * @param removal iterator to the CacheChange_t to remove. + * @param release defaults to true and hints if the CacheChange_t should return to the pool + * @return iterator to the next CacheChange_t or end iterator. + */ + iterator remove_change_nts( + const_iterator removal, + bool release = true) override; + + /** + * Called when a change is received by the Subscriber. Will add the change to the history. + * @pre Change should not be already present in the history. + * @param[in] change The received change + * @param unknown_missing_changes_up_to Number of missing changes before this one + * @return + */ + bool received_change( + CacheChange_t* change, + size_t unknown_missing_changes_up_to) override; + + /** + * @brief Returns information about the first untaken sample. + * @param [out] info SampleInfo structure to store first untaken sample information. + * @return true if sample info was returned. false if there is no sample to take. + */ + bool get_first_untaken_info( + SampleInfo& info); + + /** + * This method is called to remove a change from the SubscriberHistory. + * @param change Pointer to the CacheChange_t. + * @return True if removed. + */ + bool remove_change_sub( + CacheChange_t* change); + + /** + * This method is called to remove a change from the SubscriberHistory. + * @param [in] change Pointer to the CacheChange_t. + * @param [in,out] it Iterator pointing to change on input. Will point to next valid change on output. + * @return True if removed. + */ + bool remove_change_sub( + CacheChange_t* change, + iterator& it); + + /** + * @brief A method to set the next deadline for the given instance + * @param handle The handle to the instance + * @param next_deadline_us The time point when the deadline will occur + * @return True if the deadline was set correctly + */ + bool set_next_deadline( + const InstanceHandle_t& handle, + const std::chrono::steady_clock::time_point& next_deadline_us); + + /** + * @brief A method to get the next instance handle that will miss the deadline and the time when the deadline will occur + * @param handle The handle to the instance + * @param next_deadline_us The time point when the instance will miss the deadline + * @return True if the deadline was retrieved successfully + */ + bool get_next_deadline( + InstanceHandle_t& handle, + std::chrono::steady_clock::time_point& next_deadline_us); + + /** + * @brief Get the list of changes corresponding to an instance handle. + * @param handle The handle to the instance. + * @param exact Indicates if the handle should match exactly (true) or if the first instance greater than the + * input handle should be returned. + * @return A pair where: + * - @c first is a boolean indicating if an instance was found + * - @c second is a pair where: + * - @c first is the handle of the returned instance + * - @c second is a pointer to a std::vector with the list of changes for the + * returned instance + * + * @remarks When used on a NO_KEY topic, an instance will only be returned when called with + * `handle = HANDLE_NIL` and `exact = false`. + */ + std::pair lookup_instance( + const InstanceHandle_t& handle, + bool exact); + +private: + + using t_m_Inst_Caches = std::map; + + //!Map where keys are instance handles and values vectors of cache changes + t_m_Inst_Caches keyed_changes_; + //!Time point when the next deadline will occur (only used for topics with no key) + std::chrono::steady_clock::time_point next_deadline_us_; + //!HistoryQosPolicy values. + HistoryQosPolicy history_qos_; + //!ResourceLimitsQosPolicy values. + ResourceLimitsQosPolicy resource_limited_qos_; + //!Topic name + fastrtps::string_255 topic_name_; + //!Type name + fastrtps::string_255 type_name_; + //!Whether the type has keys + bool has_keys_; + //!TopicDataType + fastdds::dds::TopicDataType* type_; + + //!Type object to deserialize Key + void* get_key_object_; + + /// Function processing a received change + std::function receive_fn_; + + /** + * @brief Method that finds a key in m_keyedChanges or tries to add it if not found + * @param a_change The change to get the key from + * @param map_it A map iterator to the given key + * @return True if it was found or could be added to the map + */ + bool find_key( + CacheChange_t* a_change, + t_m_Inst_Caches::iterator* map_it); + + /** + * @brief Method that finds a key in m_keyedChanges or tries to add it if not found + * @param a_change The change to get the key from + * @param map_it A map iterator to the given key + * @return True if it was found or could be added to the map + */ + bool find_key_for_change( + CacheChange_t* a_change, + t_m_Inst_Caches::iterator& map_it); + + /** + * @name Variants of incoming change processing. + * Will be called with the history mutex taken. + * @param[in] change The received change + * @param unknown_missing_changes_up_to Number of missing changes before this one + * @return + */ + ///@{ + bool received_change_keep_all_no_key( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_last_no_key( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_all_with_key( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_last_with_key( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + ///@} + + bool add_received_change( + CacheChange_t* a_change); + + bool add_received_change_with_key( + CacheChange_t* a_change, + std::vector& instance_changes); +}; + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_ diff --git a/test/unittest/dds/status/CMakeLists.txt b/test/unittest/dds/status/CMakeLists.txt index 97e3805ca34..cd0ad76ff4c 100644 --- a/test/unittest/dds/status/CMakeLists.txt +++ b/test/unittest/dds/status/CMakeLists.txt @@ -36,6 +36,7 @@ set(LISTENERTESTS_SOURCE ListenerTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/SubscriberImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/SubscriberQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt index da809281b49..3d9082b851b 100644 --- a/test/unittest/statistics/dds/CMakeLists.txt +++ b/test/unittest/statistics/dds/CMakeLists.txt @@ -129,6 +129,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS) ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/Subscriber.cpp From 1072b9374aa3003d37e7acedcc0498e4e6002f0d Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 7 Sep 2021 15:23:56 +0200 Subject: [PATCH 02/10] Fixed DataReaderHistory (#2194) --- .../fastdds/subscriber/history/DataReaderHistory.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index bc56f611891..5c1adfe6d33 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -28,6 +28,7 @@ #include #include +#include namespace eprosima { namespace fastdds { @@ -305,10 +306,11 @@ bool DataReaderHistory::add_received_change_with_key( } //ADD TO KEY VECTOR - - // As the instance should be ordered following the presentation QoS, and - // we only support ordering by reception timestamp, we can always add at the end. - instance_changes.push_back(a_change); + eprosima::utilities::collections::sorted_vector_insert(instance_changes, a_change, + [](const CacheChange_t* lhs, const CacheChange_t* rhs) + { + return lhs->sourceTimestamp < rhs->sourceTimestamp; + }); logInfo(SUBSCRIBER, mp_reader->getGuid().entityId << ": Change " << a_change->sequenceNumber << " added from: " From 3d4b37970ead21cda326200202dd97d02c604296 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 20 Oct 2021 13:02:12 +0200 Subject: [PATCH 03/10] DataReader test for sample_info fields (#2193) * Refs 12400. Initial test infrastructure. Signed-off-by: Miguel Company * Refs 12400. Additional tests. Signed-off-by: Miguel Company * Refs 12400. Tests are run twice with a take in the middle. Signed-off-by: Miguel Company * Refs 12400. Uncrustify. Signed-off-by: Miguel Company * Refs 12469. Fixed warnings. Signed-off-by: Miguel Company * Refs 12469. Additional comments. Signed-off-by: Miguel Company --- .../dds/subscriber/DataReaderTests.cpp | 304 ++++++++++++++++++ 1 file changed, 304 insertions(+) diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index 4cca1779e83..384839bb9f5 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -1467,6 +1467,310 @@ TEST_F(DataReaderTests, read_unread) } } +TEST_F(DataReaderTests, sample_info) +{ + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS; + reader_qos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS; + reader_qos.history().kind = KEEP_LAST_HISTORY_QOS; + reader_qos.history().depth = 1; + reader_qos.resource_limits().max_instances = 2; + reader_qos.resource_limits().max_samples_per_instance = 1; + reader_qos.resource_limits().max_samples = 2; + + create_entities(nullptr, reader_qos); + publisher_->delete_datawriter(data_writer_); + data_writer_ = nullptr; + + struct TestCmd + { + enum Operation + { + WRITE, UNREGISTER, DISPOSE, CLOSE + }; + + size_t writer_index; + Operation operation; + size_t instance_index; + }; + + struct TestInstanceResult + { + ReturnCode_t ret_code; + ViewStateKind view_state; + InstanceStateKind instance_state; + int32_t disposed_generation_count; + int32_t no_writers_generation_count; + }; + + struct TestStep + { + std::vector operations; + TestInstanceResult instance_state[2]; + }; + + struct TestState + { + TestState( + TypeSupport& type, + Topic* topic, + Publisher* publisher) + : topic_(topic) + , publisher_(publisher) + { + writer_qos_ = DATAWRITER_QOS_DEFAULT; + writer_qos_.publish_mode().kind = SYNCHRONOUS_PUBLISH_MODE; + writer_qos_.reliability().kind = RELIABLE_RELIABILITY_QOS; + writer_qos_.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS; + writer_qos_.history().kind = KEEP_LAST_HISTORY_QOS; + writer_qos_.history().depth = 1; + writer_qos_.resource_limits().max_instances = 2; + writer_qos_.resource_limits().max_samples_per_instance = 1; + writer_qos_.resource_limits().max_samples = 2; + + data_[0].index(1); + data_[1].index(2); + + type.get_key(&data_[0], &handles_[0]); + type.get_key(&data_[1], &handles_[1]); + } + + ~TestState() + { + close_writer(0); + close_writer(1); + } + + void run_test( + DataReader* reader, + const std::vector& steps) + { + for (const TestStep& step : steps) + { + for (const TestCmd& cmd : step.operations) + { + execute(cmd); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + check(reader, 0, step.instance_state[0]); + check(reader, 1, step.instance_state[1]); + } + + } + + void execute( + const TestCmd& cmd) + { + DataWriter* writer = nullptr; + ReturnCode_t ret_code; + + switch (cmd.operation) + { + case TestCmd::CLOSE: + close_writer(cmd.writer_index); + break; + + case TestCmd::DISPOSE: + writer = open_writer(cmd.writer_index); + ret_code = writer->dispose(&data_[cmd.instance_index], handles_[cmd.instance_index]); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret_code); + break; + + case TestCmd::UNREGISTER: + writer = open_writer(cmd.writer_index); + ret_code = writer->unregister_instance(&data_[cmd.instance_index], handles_[cmd.instance_index]); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret_code); + break; + + case TestCmd::WRITE: + writer = open_writer(cmd.writer_index); + ret_code = writer->write(&data_[cmd.instance_index], handles_[cmd.instance_index]); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret_code); + break; + } + } + + void check( + DataReader* reader, + size_t instance_index, + const TestInstanceResult& instance_result) + { + FooSeq values; + SampleInfoSeq infos; + ReturnCode_t ret_code; + + ret_code = reader->read_instance(values, infos, LENGTH_UNLIMITED, handles_[instance_index]); + EXPECT_EQ(ret_code, instance_result.ret_code); + if (ReturnCode_t::RETCODE_OK == ret_code) + { + EXPECT_EQ(instance_result.instance_state, infos[0].instance_state); + EXPECT_EQ(instance_result.view_state, infos[0].view_state); + EXPECT_EQ(instance_result.disposed_generation_count, infos[0].disposed_generation_count); + EXPECT_EQ(instance_result.no_writers_generation_count, infos[0].no_writers_generation_count); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, reader->return_loan(values, infos)); + } + } + + private: + + Topic* topic_; + Publisher* publisher_; + DataWriterQos writer_qos_; + DataWriter* writers_[2] = { nullptr, nullptr }; + + InstanceHandle_t handles_[2]; + FooType data_[2]; + + void close_writer( + size_t index) + { + DataWriter*& writer = writers_[index]; + if (writer != nullptr) + { + publisher_->delete_datawriter(writer); + writer = nullptr; + } + } + + DataWriter* open_writer( + size_t index) + { + DataWriter*& writer = writers_[index]; + if (writer == nullptr) + { + writer = publisher_->create_datawriter(topic_, writer_qos_); + } + return writer; + } + + }; + + static const std::vector steps = + { + { + // Instances have never been written + {}, + { + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // One writer writes on first instance => that instance should be NEW and ALIVE + { {0, TestCmd::WRITE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // Same writer writes on first instance => instance becomes NOT_NEW + { {0, TestCmd::WRITE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // Same writer disposes first instance => instance becomes NOT_ALIVE_DISPOSED + { {0, TestCmd::DISPOSE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 0}, + {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // First writer writes second instance => NEW and ALIVE + // Second writer writes first instance => NEW and ALIVE + { {0, TestCmd::WRITE, 1}, {1, TestCmd::WRITE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 0}, + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // Both writers write on second instance => NOT_NEW and ALIVE + { {0, TestCmd::WRITE, 1}, {1, TestCmd::WRITE, 1} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 0}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // Second writer closes => first instance becomes NOT_ALIVE_NO_WRITERS + { {1, TestCmd::CLOSE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 1, 0}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0}, + } + }, + { + // First writer unregisters second instance => NOT_ALIVE_NO_WRITERS + { {0, TestCmd::UNREGISTER, 1} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 1, 0}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 0, 0}, + } + }, + { + // Both writers write both instances + { {0, TestCmd::WRITE, 0}, {1, TestCmd::WRITE, 0}, {0, TestCmd::WRITE, 1}, {1, TestCmd::WRITE, 1} }, + { + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 1}, + } + }, + { + // Reading twice should return NOT_NEW + {}, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 1}, + } + }, + { + // 0 - Unregistering while having another alive writer should not change state + // 1 - Disposing while having another alive writer is always done + { {0, TestCmd::UNREGISTER, 0}, {1, TestCmd::DISPOSE, 1} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 1}, + } + }, + { + // 0 - Writing and unregistering while having another alive writer should not change state + // 1 - Unregister a disposed instance should not change state + { {0, TestCmd::WRITE, 0}, {0, TestCmd::UNREGISTER, 1}, {1, TestCmd::UNREGISTER, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 1}, + } + }, + { + // 0 - Closing both writers should return NOT_ALIVE_NO_WRITERS + // 1 - Closing both writers on a disposed instance should not change state + { {0, TestCmd::CLOSE, 0}, {1, TestCmd::CLOSE, 0} }, + { + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 1, 1}, + {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 1}, + } + }, + }; + + // Run test once + TestState state(type_, topic_, publisher_); + state.run_test(data_reader_, steps); + + // Taking all data should remove instance information + FooSeq data; + SampleInfoSeq infos; + EXPECT_EQ(ReturnCode_t::RETCODE_OK, data_reader_->take(data, infos)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, data_reader_->return_loan(data, infos)); + + // Run test again + state.run_test(data_reader_, steps); +} + /* * This type fails deserialization on odd samples */ From d03d6ea8fe6f5eafeb26778d8ec324c91a4fe2b1 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 Dec 2021 09:10:11 +0100 Subject: [PATCH 04/10] Adding implementation for instance_state and view_state (#2298) * Refs 12400. Added DataReaderCacheChange. Signed-off-by: Miguel Company * Refs 12400. Added DataReaderInstance. Signed-off-by: Miguel Company * Refs 12400. DataReaderHistory using new types. Signed-off-by: Miguel Company * Refs 12400. Removing unnecessary method from SubscriberHistory. Signed-off-by: Miguel Company * Refs 12400. ReadTakeCommand receives full instance information. Signed-off-by: Miguel Company * Refs 12400. ReadTakeCommand checks for instance states. Signed-off-by: Miguel Company * Refs 12400. ReadTakeCommand fills sample info from instance data. Signed-off-by: Miguel Company * Refs 12400. Added insert method to ResourceLimitedVector. Signed-off-by: Miguel Company * Refs 12400. DataReaderInstance uses ResourceLimitedVector. Signed-off-by: Miguel Company * Refs 12400. get_first_untaken_info takes sample info from instance data. Signed-off-by: Miguel Company * Refs 12400. Discard received change when older than oldest. Signed-off-by: Miguel Company * Refs 12400. Fixing KEEP_ALL with keys. Signed-off-by: Miguel Company * Refs 12400. Refactor to always use instances. Signed-off-by: Miguel Company * Refs 12400. Basic structure for update instance state. Signed-off-by: Miguel Company * Refs 12400. Adding alive_writers and current_owner to DataReaderInstance. Signed-off-by: Miguel Company * Refs 12400. Implementing writer_alive. Signed-off-by: Miguel Company * Refs 12400. Implementing writer_dispose. Signed-off-by: Miguel Company * Refs 12400. Implementing writer_unregister. Signed-off-by: Miguel Company * Refs 12400. Setting NOT_NEW on returned instances. Signed-off-by: Miguel Company * Refs 12400. Correct return code when returning samples with no data. Signed-off-by: Miguel Company * Refs 12400. Set view_state to NEW when changing instance_state to ALIVE. Signed-off-by: Miguel Company * Refs 12400. Moving generation counts into CacheChange_t. Signed-off-by: Miguel Company * Refs 12400. Assigning generation counts after processing instance state. Signed-off-by: Miguel Company * Refs 12400. Update instance_state when writer becomes not alive. Signed-off-by: Miguel Company * Refs 12400. Clear alive_writers when changing generation. Signed-off-by: Miguel Company * Refs 12400. Add writer_unmatched to ReaderHistory. Signed-off-by: Miguel Company * Refs 12400. NOT_ALIVE_UNREGISTERED should not return valid data. Signed-off-by: Miguel Company * Refs 12400. Set autodispose_unregistered_instances to false on test. Signed-off-by: Miguel Company * Refs 12400. Remove instance when it becomes empty and is not alive. Signed-off-by: Miguel Company * Refs 12400. Refactor into writer_not_alive. Signed-off-by: Miguel Company * Refs 12400. Keeping samples from unmatched writers. Signed-off-by: Miguel Company * Refs 12400. Avoid keeping non-notified samples. Signed-off-by: Miguel Company * Refs 12400. Linters. Signed-off-by: Miguel Company * Refs 12758. Fixing compilation warnings on windows. Signed-off-by: Miguel Barro * Refs 12758. Fixing assertion on WriterProxy logic. If we only notify fragmented DATA reception on completion we should only notify removal of fully assembled samples. Signed-off-by: Miguel Barro * Refs 12758 Fixing DataReaderHistory test that checks DataWriter disposal behaviour. Signed-off-by: Miguel Barro * Refs 12758. Use move semantics. Signed-off-by: Miguel Company * Refs 12758. Use ResourceLimitedVector for writers. Signed-off-by: Miguel Company * Refs 12758. Apply pre-allocation policies. Signed-off-by: Miguel Company * Refs 12758. Uncrustify. Signed-off-by: Miguel Company * Refs 12758. PubSubReader. Account for writer_guid on last_seq checks. Signed-off-by: Miguel Company * Refs 12758. Added can_change_be_added_nts. Signed-off-by: Miguel Company * Refs 12758. Removed unused method. Signed-off-by: Miguel Company * Refs 12758. Always use completed changes for key computation. Signed-off-by: Miguel Company * Refs 12758. Fixed ResourceLimitedVector::insert. Signed-off-by: Miguel Company * Refs 12758. Uncrustify. Signed-off-by: Miguel Company * Refs 12758. Avoid dynamic allocation inside remove_changes_with_pred. Signed-off-by: Miguel Company * Refs 12758. Optimization on DataReaderHistory::remove_change_nts. Signed-off-by: Miguel Company * Refs 12758. Method writer_unmatched documented and improved. Signed-off-by: Miguel Company * Refs 12758. Do not complete changes for non-keyed topics. Signed-off-by: Miguel Company * Refs 12758. Do not remove incomplete changes for keyed topics. Signed-off-by: Miguel Company * Refs 12758. Different removal policy on ReaderHistory and DataReaderHistory. Signed-off-by: Miguel Company * Refs 12758. Fix unused parameter warning. Signed-off-by: Miguel Company * Refs 12758. Removed unused header. Signed-off-by: Miguel Company * Refs 12758. Doxydoc improvements. Signed-off-by: Miguel Company * Refs 12758. Linters. Signed-off-by: Miguel Company Co-authored-by: Miguel Barro --- include/fastdds/dds/subscriber/SampleInfo.hpp | 2 +- include/fastdds/rtps/common/CacheChange.h | 4 + include/fastdds/rtps/history/ReaderHistory.h | 28 + .../fastrtps/subscriber/SubscriberHistory.h | 21 - .../collections/ResourceLimitedVector.hpp | 41 ++ src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 15 +- .../DataReaderImpl/ReadTakeCommand.hpp | 105 ++-- .../history/DataReaderCacheChange.hpp | 39 ++ .../subscriber/history/DataReaderHistory.cpp | 579 ++++++++++-------- .../subscriber/history/DataReaderHistory.hpp | 138 +++-- .../subscriber/history/DataReaderInstance.hpp | 216 +++++++ .../subscriber/SubscriberHistory.cpp | 52 -- src/cpp/rtps/history/ReaderHistory.cpp | 37 +- src/cpp/rtps/reader/StatefulReader.cpp | 52 +- src/cpp/rtps/reader/StatelessReader.cpp | 2 +- src/cpp/utils/SystemInfo.cpp | 3 +- test/blackbox/api/dds-pim/PubSubReader.hpp | 8 +- .../fastdds/rtps/history/ReaderHistory.h | 12 + .../dds/subscriber/DataReaderTests.cpp | 9 +- 19 files changed, 889 insertions(+), 474 deletions(-) create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp diff --git a/include/fastdds/dds/subscriber/SampleInfo.hpp b/include/fastdds/dds/subscriber/SampleInfo.hpp index 7af134ba44e..8f9e5ba8da5 100644 --- a/include/fastdds/dds/subscriber/SampleInfo.hpp +++ b/include/fastdds/dds/subscriber/SampleInfo.hpp @@ -89,7 +89,7 @@ struct SampleInfo //! the generation difference between the time the sample was received, and the time the most recent sample was received. //! The most recent sample used for the calculation may or may not be in the returned collection - int32_t absoulte_generation_rank; + int32_t absolute_generation_rank; //! time provided by the DataWriter when the sample was written fastrtps::rtps::Time_t source_timestamp; diff --git a/include/fastdds/rtps/common/CacheChange.h b/include/fastdds/rtps/common/CacheChange.h index 556149ad8a9..1f3c260ca8a 100644 --- a/include/fastdds/rtps/common/CacheChange.h +++ b/include/fastdds/rtps/common/CacheChange.h @@ -57,6 +57,10 @@ struct CacheChangeReaderInfo_t { //!Reception TimeStamp (only used in Readers) Time_t receptionTimestamp; + //! Disposed generation of the instance when this entry was added to it + int32_t disposed_generation_count; + //! No-writers generation of the instance when this entry was added to it + int32_t no_writers_generation_count; }; /** diff --git a/include/fastdds/rtps/history/ReaderHistory.h b/include/fastdds/rtps/history/ReaderHistory.h index 2d83ca02bbd..eb2a9fa6c39 100644 --- a/include/fastdds/rtps/history/ReaderHistory.h +++ b/include/fastdds/rtps/history/ReaderHistory.h @@ -152,6 +152,19 @@ class ReaderHistory : public History CacheChange_t** min_change, const GUID_t& writerGuid); + /** + * Called when a writer is unmatched from the reader holding this history. + * + * This method will remove all the changes on the history that came from the writer being unmatched and which have + * not yet been notified to the user. + * + * @param writer_guid GUID of the writer being unmatched. + * @param last_notified_seq Last sequence number from the specified writer that was notified to the user. + */ + RTPS_DllAPI virtual void writer_unmatched( + const GUID_t& writer_guid, + const SequenceNumber_t& last_notified_seq); + protected: RTPS_DllAPI bool do_reserve_cache( @@ -161,6 +174,21 @@ class ReaderHistory : public History RTPS_DllAPI void do_release_cache( CacheChange_t* ch) override; + template + inline void remove_changes_with_pred( + Pred pred) + { + assert(nullptr != mp_reader); + assert(nullptr != mp_mutex); + + std::lock_guard guard(*mp_mutex); + std::vector::iterator new_end = std::remove_if(m_changes.begin(), m_changes.end(), pred); + while (new_end != m_changes.end()) + { + new_end = remove_change_nts(new_end); + } + } + //!Pointer to the reader RTPSReader* mp_reader; diff --git a/include/fastrtps/subscriber/SubscriberHistory.h b/include/fastrtps/subscriber/SubscriberHistory.h index d0d1f1fdf2b..be94ed6a291 100644 --- a/include/fastrtps/subscriber/SubscriberHistory.h +++ b/include/fastrtps/subscriber/SubscriberHistory.h @@ -44,8 +44,6 @@ class SubscriberHistory : public rtps::ReaderHistory { public: - using instance_info = std::pair*>; - /** * Constructor. Requires information about the subscriber. * @param topic_att TopicAttributes. @@ -179,25 +177,6 @@ class SubscriberHistory : public rtps::ReaderHistory rtps::InstanceHandle_t& handle, std::chrono::steady_clock::time_point& next_deadline_us); - /** - * @brief Get the list of changes corresponding to an instance handle. - * @param handle The handle to the instance. - * @param exact Indicates if the handle should match exactly (true) or if the first instance greater than the - * input handle should be returned. - * @return A pair where: - * - @c first is a boolean indicating if an instance was found - * - @c second is a pair where: - * - @c first is the handle of the returned instance - * - @c second is a pointer to a std::vector with the list of changes for the - * returned instance - * - * @remarks When used on a NO_KEY topic, an instance will only be returned when called with - * `handle = HANDLE_NIL` and `exact = false`. - */ - std::pair lookup_instance( - const rtps::InstanceHandle_t& handle, - bool exact); - private: using t_m_Inst_Caches = std::map; diff --git a/include/fastrtps/utils/collections/ResourceLimitedVector.hpp b/include/fastrtps/utils/collections/ResourceLimitedVector.hpp index 91aa80a3c12..56ee45e673b 100644 --- a/include/fastrtps/utils/collections/ResourceLimitedVector.hpp +++ b/include/fastrtps/utils/collections/ResourceLimitedVector.hpp @@ -119,6 +119,47 @@ class ResourceLimitedVector return *this; } + /** + * Insert value before pos. + * + * @param pos iterator before which the content will be inserted. pos may be the end() iterator. + * @param value element value to insert. + * + * @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits. + */ + iterator insert( + const_iterator pos, + const value_type& value) + { + auto dist = std::distance(collection_.cbegin(), pos); + if (!ensure_capacity()) + { + return end(); + } + + return collection_.insert(collection_.cbegin() + dist, value); + } + + /** + * Insert value before pos. + * + * @param pos iterator before which the content will be inserted. pos may be the end() iterator. + * @param value element value to insert. + * + * @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits. + */ + iterator insert( + const_iterator pos, + value_type&& value) + { + if (!ensure_capacity()) + { + return end(); + } + + return collection_.insert(pos, std::move(value)); + } + /** * Add element at the end. * diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 5bcd486ca25..2444a17553e 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -841,10 +841,10 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( bool DataReaderImpl::on_new_cache_change_added( const CacheChange_t* const change) { + std::lock_guard guard(reader_->getMutex()); + if (qos_.deadline().period != c_TimeInfinite) { - std::unique_lock lock(reader_->getMutex()); - if (!history_.set_next_deadline( change->instanceHandle, steady_clock::now() + duration_cast(deadline_duration_us_))) @@ -862,6 +862,7 @@ bool DataReaderImpl::on_new_cache_change_added( } CacheChange_t* new_change = const_cast(change); + history_.update_instance_nts(new_change); if (qos_.lifespan().duration == c_TimeInfinite) { @@ -917,6 +918,11 @@ void DataReaderImpl::update_subscription_matched_status( subscription_matched_status_.last_publication_handle = status.last_publication_handle; } + if (count_change < 0) + { + history_.writer_not_alive(iHandle2GUID(status.last_publication_handle)); + } + StatusMask notify_status = StatusMask::subscription_matched(); DataReaderListener* listener = get_listener_for(notify_status); if (listener != nullptr) @@ -1184,6 +1190,11 @@ RequestedIncompatibleQosStatus& DataReaderImpl::update_requested_incompatible_qo LivelinessChangedStatus& DataReaderImpl::update_liveliness_status( const fastrtps::LivelinessChangedStatus& status) { + if (0 < status.not_alive_count_change) + { + history_.writer_not_alive(iHandle2GUID(status.last_publication_handle)); + } + liveliness_changed_status_.alive_count = status.alive_count; liveliness_changed_status_.not_alive_count = status.not_alive_count; liveliness_changed_status_.alive_count_change += status.alive_count_change; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp index 54542d7f5ac..5ba529ebed3 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp @@ -64,7 +64,7 @@ struct ReadTakeCommand SampleInfoSeq& sample_infos, int32_t max_samples, const StateFilter& states, - history_type::instance_info instance, + const history_type::instance_info& instance, bool single_instance = false) : type_(reader.type_) , loan_manager_(reader.loan_manager_) @@ -108,8 +108,8 @@ struct ReadTakeCommand // Traverse changes on current instance bool ret_val = false; LoanableCollection::size_type first_slot = current_slot_; - auto it = instance_.second->begin(); - while (!finished_ && it != instance_.second->end()) + auto it = instance_.second->cache_changes.begin(); + while (!finished_ && it != instance_.second->cache_changes.end()) { CacheChange_t* change = *it; SampleStateKind check; @@ -142,10 +142,9 @@ struct ReadTakeCommand // in the future also if (!is_future_change) { - // Add sample and info to collections ReturnCode_t previous_return_value = return_value_; - bool added = add_sample(change, remove_change); + bool added = add_sample(*it, remove_change); reader_->end_sample_access_nts(change, wp, added); // Check if the payload is dirty @@ -181,6 +180,7 @@ struct ReadTakeCommand if (current_slot_ > first_slot) { + instance_.second->view_state = ViewStateKind::NOT_NEW_VIEW_STATE; ret_val = true; // complete sample infos @@ -208,6 +208,41 @@ struct ReadTakeCommand return return_value_; } + static void generate_info( + SampleInfo& info, + const DataReaderInstance& instance, + const DataReaderCacheChange& item) + { + info.sample_state = item->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE; + info.instance_state = instance.instance_state; + info.view_state = instance.view_state; + info.disposed_generation_count = item->reader_info.disposed_generation_count; + info.no_writers_generation_count = item->reader_info.no_writers_generation_count; + info.sample_rank = 0; + info.generation_rank = 0; + info.absolute_generation_rank = 0; + info.source_timestamp = item->sourceTimestamp; + info.reception_timestamp = item->reader_info.receptionTimestamp; + info.instance_handle = item->instanceHandle; + info.publication_handle = InstanceHandle_t(item->writerGUID); + info.sample_identity.writer_guid(item->writerGUID); + info.sample_identity.sequence_number(item->sequenceNumber); + info.related_sample_identity = item->write_params.sample_identity(); + info.valid_data = true; + + switch (item->kind) + { + case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: + case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED: + case eprosima::fastrtps::rtps::NOT_ALIVE_UNREGISTERED: + info.valid_data = false; + break; + case eprosima::fastrtps::rtps::ALIVE: + default: + break; + } + } + private: const TypeSupport& type_; @@ -243,14 +278,15 @@ struct ReadTakeCommand bool is_current_instance_valid() { - // We are not implementing instance_state or view_state yet, so all instances will be considered to have - // a valid state. In the future this should check instance_state against states_.instance_states and - // view_state against states_.view_states - return true; + // Check instance_state against states_.instance_states and view_state against states_.view_states + auto instance_state = instance_.second->instance_state; + auto view_state = instance_.second->view_state; + return (0 != (states_.instance_states & instance_state)) && (0 != (states_.view_states & view_state)); } bool next_instance() { + history_.check_and_remove_instance(instance_); if (single_instance_) { finished_ = true; @@ -270,7 +306,7 @@ struct ReadTakeCommand } bool add_sample( - CacheChange_t* change, + const DataReaderCacheChange& item, bool& deserialization_error) { bool ret_val = false; @@ -284,10 +320,10 @@ struct ReadTakeCommand sample_infos_.length(new_len); // Add information - generate_info(change); + generate_info(item); if (sample_infos_[current_slot_].valid_data) { - if (!deserialize_sample(change)) + if (!deserialize_sample(item)) { // Decrement length of collections data_values_.length(current_slot_); @@ -295,11 +331,10 @@ struct ReadTakeCommand deserialization_error = true; return false; } - - // Mark that some data is available - return_value_ = ReturnCode_t::RETCODE_OK; } + // Mark that some data is available + return_value_ = ReturnCode_t::RETCODE_OK; ++current_slot_; --remaining_samples_; ret_val = true; @@ -330,48 +365,18 @@ struct ReadTakeCommand } void generate_info( - CacheChange_t* change) + const DataReaderCacheChange& item) { // Loan when necessary if (!sample_infos_.has_ownership()) { - SampleInfo* item = info_pool_.get_item(); - assert(item != nullptr); - const_cast(sample_infos_.buffer())[current_slot_] = item; + SampleInfo* pool_item = info_pool_.get_item(); + assert(pool_item != nullptr); + const_cast(sample_infos_.buffer())[current_slot_] = pool_item; } SampleInfo& info = sample_infos_[current_slot_]; - info.sample_state = change->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE; - info.view_state = NOT_NEW_VIEW_STATE; - info.disposed_generation_count = 0; - info.no_writers_generation_count = 1; - info.sample_rank = 0; - info.generation_rank = 0; - info.absoulte_generation_rank = 0; - info.source_timestamp = change->sourceTimestamp; - info.reception_timestamp = change->reader_info.receptionTimestamp; - info.instance_handle = handle_; - info.publication_handle = InstanceHandle_t(change->writerGUID); - info.sample_identity.writer_guid(change->writerGUID); - info.sample_identity.sequence_number(change->sequenceNumber); - info.related_sample_identity = change->write_params.sample_identity(); - info.valid_data = true; - - switch (change->kind) - { - case eprosima::fastrtps::rtps::ALIVE: - info.instance_state = ALIVE_INSTANCE_STATE; - break; - case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: - case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED: - info.instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE; - info.valid_data = false; - break; - default: - //TODO [ILG] change this if the other kinds ever get implemented - info.instance_state = ALIVE_INSTANCE_STATE; - break; - } + generate_info(info, *instance_.second, item); } bool check_datasharing_validity( diff --git a/src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp b/src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp new file mode 100644 index 00000000000..5b13109f721 --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp @@ -0,0 +1,39 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderCacheChange.hpp + */ + +#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_ +#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_ + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +/// A DataReader cache entry +using DataReaderCacheChange = fastrtps::rtps::CacheChange_t*; + +} /* namespace detail */ +} /* namespace dds */ +} /* namespace fastdds */ +} /* namespace eprosima */ + +#endif // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_ diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index 5c1adfe6d33..9ca5826bf1c 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -27,6 +27,7 @@ #include #include +#include #include #include @@ -37,44 +38,8 @@ namespace detail { using namespace eprosima::fastrtps::rtps; -using eprosima::fastrtps::KeyedChanges; using eprosima::fastrtps::RecursiveTimedMutex; -static void get_sample_info( - SampleInfo& info, - CacheChange_t* change) -{ - info.sample_state = NOT_READ_SAMPLE_STATE; - info.view_state = NOT_NEW_VIEW_STATE; - info.disposed_generation_count = 0; - info.no_writers_generation_count = 1; - info.sample_rank = 0; - info.generation_rank = 0; - info.absoulte_generation_rank = 0; - info.source_timestamp = change->sourceTimestamp; - info.reception_timestamp = change->reader_info.receptionTimestamp; - info.instance_handle = change->instanceHandle; - info.publication_handle = fastrtps::rtps::InstanceHandle_t(change->writerGUID); - info.sample_identity.writer_guid(change->writerGUID); - info.sample_identity.sequence_number(change->sequenceNumber); - info.related_sample_identity = change->write_params.sample_identity(); - info.valid_data = change->kind == eprosima::fastrtps::rtps::ALIVE; - - switch (change->kind) - { - case eprosima::fastrtps::rtps::ALIVE: - info.instance_state = ALIVE_INSTANCE_STATE; - break; - case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: - info.instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE; - break; - default: - //TODO [ILG] change this if the other kinds ever get implemented - info.instance_state = ALIVE_INSTANCE_STATE; - break; - } -} - static HistoryAttributes to_history_attributes( const TypeSupport& type, const DataReaderQos& qos) @@ -104,6 +69,7 @@ DataReaderHistory::DataReaderHistory( const TopicDescription& topic, const DataReaderQos& qos) : ReaderHistory(to_history_attributes(type, qos)) + , key_writers_allocation_(qos.reader_resource_limits().matched_publisher_allocation) , history_qos_(qos.history()) , resource_limited_qos_(qos.resource_limits()) , topic_name_(topic.get_name()) @@ -112,11 +78,6 @@ DataReaderHistory::DataReaderHistory( , type_(type.get()) , get_key_object_(nullptr) { - if (type_->m_isGetKeyDefined) - { - get_key_object_ = type_->createData(); - } - if (resource_limited_qos_.max_samples == 0) { resource_limited_qos_.max_samples = std::numeric_limits::max(); @@ -132,20 +93,75 @@ DataReaderHistory::DataReaderHistory( resource_limited_qos_.max_samples_per_instance = std::numeric_limits::max(); } + if (type_->m_isGetKeyDefined) + { + get_key_object_ = type_->createData(); + + if (resource_limited_qos_.max_samples_per_instance < std::numeric_limits::max()) + { + key_changes_allocation_.maximum = resource_limited_qos_.max_samples_per_instance; + } + } + else + { + resource_limited_qos_.max_instances = 1; + resource_limited_qos_.max_samples_per_instance = resource_limited_qos_.max_samples; + key_changes_allocation_.initial = resource_limited_qos_.allocated_samples; + key_changes_allocation_.maximum = resource_limited_qos_.max_samples; + + keyed_changes_.emplace(c_InstanceHandle_Unknown, + DataReaderInstance{ key_changes_allocation_, key_writers_allocation_ }); + } + using std::placeholders::_1; using std::placeholders::_2; + receive_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? + std::bind(&DataReaderHistory::received_change_keep_all, this, _1, _2) : + std::bind(&DataReaderHistory::received_change_keep_last, this, _1, _2); + + complete_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? + std::bind(&DataReaderHistory::completed_change_keep_all, this, _1, _2) : + std::bind(&DataReaderHistory::completed_change_keep_last, this, _1, _2); + if (!has_keys_) { - receive_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? - std::bind(&DataReaderHistory::received_change_keep_all_no_key, this, _1, _2) : - std::bind(&DataReaderHistory::received_change_keep_last_no_key, this, _1, _2); + compute_key_for_change_fn_ = [](CacheChange_t* change) + { + change->instanceHandle = c_InstanceHandle_Unknown; + return true; + }; } else { - receive_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? - std::bind(&DataReaderHistory::received_change_keep_all_with_key, this, _1, _2) : - std::bind(&DataReaderHistory::received_change_keep_last_with_key, this, _1, _2); + compute_key_for_change_fn_ = + [this](CacheChange_t* a_change) + { + if (a_change->instanceHandle.isDefined()) + { + return true; + } + + if (!a_change->is_fully_assembled()) + { + return false; + } + + if (type_ != nullptr) + { + logInfo(SUBSCRIBER, "Getting Key of change with no Key transmitted"); + type_->deserialize(&a_change->serializedPayload, get_key_object_); + bool is_key_protected = false; +#if HAVE_SECURITY + is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected; +#endif // if HAVE_SECURITY + return type_->getKey(get_key_object_, &a_change->instanceHandle, is_key_protected); + } + + logWarning(SUBSCRIBER, "NO KEY in topic: " << topic_name_ + << " and no method to obtain it"; ); + return false; + }; } } @@ -157,88 +173,78 @@ DataReaderHistory::~DataReaderHistory() } } -bool DataReaderHistory::received_change( - CacheChange_t* a_change, - size_t unknown_missing_changes_up_to) +bool DataReaderHistory::can_change_be_added_nts( + const GUID_t& writer_guid, + uint32_t total_payload_size, + size_t unknown_missing_changes_up_to, + bool& will_never_be_accepted) const { - if (mp_reader == nullptr || mp_mutex == nullptr) + if (!ReaderHistory::can_change_be_added_nts(writer_guid, total_payload_size, unknown_missing_changes_up_to, + will_never_be_accepted)) { - logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); return false; } - std::lock_guard guard(*mp_mutex); - return receive_fn_(a_change, unknown_missing_changes_up_to); + will_never_be_accepted = false; + return (KEEP_ALL_HISTORY_QOS != history_qos_.kind) || + (m_changes.size() + unknown_missing_changes_up_to < static_cast(resource_limited_qos_.max_samples)); } -bool DataReaderHistory::received_change_keep_all_no_key( +bool DataReaderHistory::received_change( CacheChange_t* a_change, size_t unknown_missing_changes_up_to) { - // TODO(Ricardo) Check - if (m_changes.size() + unknown_missing_changes_up_to < static_cast(resource_limited_qos_.max_samples)) + if (mp_reader == nullptr || mp_mutex == nullptr) { - return add_received_change(a_change); + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; } - return false; + std::lock_guard guard(*mp_mutex); + return receive_fn_(a_change, unknown_missing_changes_up_to); } -bool DataReaderHistory::received_change_keep_last_no_key( +bool DataReaderHistory::received_change_keep_all( CacheChange_t* a_change, - size_t /* unknown_missing_changes_up_to */ ) + size_t unknown_missing_changes_up_to) { - bool add = false; - if (m_changes.size() < static_cast(history_qos_.depth)) - { - add = true; - } - else - { - // Try to substitute the oldest sample. - - // As the history should be ordered following the presentation QoS, we can always remove the first one. - add = remove_change_sub(m_changes.at(0)); - } - - if (add) + if (!compute_key_for_change_fn_(a_change)) { - return add_received_change(a_change); + // Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too. + return add_to_reader_history_if_not_full(a_change); } - return false; -} - -bool DataReaderHistory::received_change_keep_all_with_key( - CacheChange_t* a_change, - size_t /* unknown_missing_changes_up_to */ ) -{ - // TODO(Miguel C): Should we check unknown_missing_changes_up_to as it is done in received_change_keep_all_no_key? - - t_m_Inst_Caches::iterator vit; - if (find_key_for_change(a_change, vit)) + InstanceCollection::iterator vit; + if (find_key(a_change->instanceHandle, vit)) { - std::vector& instance_changes = vit->second.cache_changes; - if (instance_changes.size() < static_cast(resource_limited_qos_.max_samples_per_instance)) + DataReaderInstance::ChangeCollection& instance_changes = vit->second.cache_changes; + size_t total_size = instance_changes.size() + unknown_missing_changes_up_to; + if (total_size < static_cast(resource_limited_qos_.max_samples_per_instance)) { - return add_received_change_with_key(a_change, vit->second.cache_changes); + return add_received_change_with_key(a_change, vit->second); } - logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance"); + logInfo(SUBSCRIBER, "Change not added due to maximum number of samples per instance"); } return false; } -bool DataReaderHistory::received_change_keep_last_with_key( +bool DataReaderHistory::received_change_keep_last( CacheChange_t* a_change, size_t /* unknown_missing_changes_up_to */) { - t_m_Inst_Caches::iterator vit; - if (find_key_for_change(a_change, vit)) + if (!compute_key_for_change_fn_(a_change)) + { + // Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too. + return add_to_reader_history_if_not_full(a_change); + } + + InstanceCollection::iterator vit; + if (find_key(a_change->instanceHandle, vit)) { bool add = false; - std::vector& instance_changes = vit->second.cache_changes; + DataReaderInstance::ChangeCollection& instance_changes = vit->second.cache_changes; if (instance_changes.size() < static_cast(history_qos_.depth)) { add = true; @@ -246,107 +252,72 @@ bool DataReaderHistory::received_change_keep_last_with_key( else { // Try to substitute the oldest sample. + CacheChange_t* first_change = instance_changes.at(0); + if (a_change->sourceTimestamp < first_change->sourceTimestamp) + { + // Received change is older than oldest, and should be discarded + return true; + } - // As the instance should be ordered following the presentation QoS, we can always remove the first one. - add = remove_change_sub(instance_changes.at(0)); + // As the instance is ordered by source timestamp, we can always remove the first one. + add = remove_change_sub(first_change); } if (add) { - return add_received_change_with_key(a_change, instance_changes); + return add_received_change_with_key(a_change, vit->second); } } return false; } -bool DataReaderHistory::add_received_change( - CacheChange_t* a_change) +bool DataReaderHistory::add_received_change_with_key( + CacheChange_t* a_change, + DataReaderInstance& instance) { - if (m_isHistoryFull) - { - // Discarding the sample. - logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << type_name_); - return false; - } - - if (add_change(a_change)) + if (add_to_reader_history_if_not_full(a_change)) { - if (m_changes.size() == static_cast(m_att.maximumReservedCaches)) - { - m_isHistoryFull = true; - } - - logInfo(SUBSCRIBER, type_name_ - << ": Change " << a_change->sequenceNumber << " added from: " - << a_change->writerGUID; ); - + add_to_instance(a_change, instance); return true; } return false; } -bool DataReaderHistory::add_received_change_with_key( - CacheChange_t* a_change, - std::vector& instance_changes) +bool DataReaderHistory::add_to_reader_history_if_not_full( + CacheChange_t* a_change) { if (m_isHistoryFull) { - // Discarting the sample. + // Discarding the sample. logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << type_name_); return false; } - if (add_change(a_change)) + bool ret_value = add_change(a_change); + if (m_changes.size() == static_cast(m_att.maximumReservedCaches)) { - if (m_changes.size() == static_cast(m_att.maximumReservedCaches)) - { - m_isHistoryFull = true; - } - - //ADD TO KEY VECTOR - eprosima::utilities::collections::sorted_vector_insert(instance_changes, a_change, - [](const CacheChange_t* lhs, const CacheChange_t* rhs) - { - return lhs->sourceTimestamp < rhs->sourceTimestamp; - }); - - logInfo(SUBSCRIBER, mp_reader->getGuid().entityId - << ": Change " << a_change->sequenceNumber << " added from: " - << a_change->writerGUID << " with KEY: " << a_change->instanceHandle; ); - - return true; + m_isHistoryFull = true; } - - return false; + return ret_value; } -bool DataReaderHistory::find_key_for_change( +void DataReaderHistory::add_to_instance( CacheChange_t* a_change, - t_m_Inst_Caches::iterator& map_it) + DataReaderInstance& instance) { - if (!a_change->instanceHandle.isDefined() && type_ != nullptr) - { - logInfo(SUBSCRIBER, "Getting Key of change with no Key transmitted"); - type_->deserialize(&a_change->serializedPayload, get_key_object_); - bool is_key_protected = false; -#if HAVE_SECURITY - is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected; -#endif // if HAVE_SECURITY - if (!type_->getKey(get_key_object_, &a_change->instanceHandle, is_key_protected)) - { - return false; - } - } - else if (!a_change->instanceHandle.isDefined()) - { - logWarning(SUBSCRIBER, "NO KEY in topic: " << topic_name_ - << " and no method to obtain it"; ); - return false; - } + // ADD TO KEY VECTOR + DataReaderCacheChange item = a_change; + eprosima::utilities::collections::sorted_vector_insert(instance.cache_changes, item, + [](const DataReaderCacheChange& lhs, const DataReaderCacheChange& rhs) + { + return lhs->sourceTimestamp < rhs->sourceTimestamp; + }); - return find_key(a_change, &map_it); + logInfo(SUBSCRIBER, mp_reader->getGuid().entityId + << ": Change " << a_change->sequenceNumber << " added from: " + << a_change->writerGUID << " with KEY: " << a_change->instanceHandle; ); } bool DataReaderHistory::get_first_untaken_info( @@ -358,7 +329,16 @@ bool DataReaderHistory::get_first_untaken_info( WriterProxy* wp = nullptr; if (mp_reader->nextUntakenCache(&change, &wp)) { - get_sample_info(info, change); + auto it = keyed_changes_.find(change->instanceHandle); + assert(it != keyed_changes_.end()); + auto& instance_changes = it->second.cache_changes; + auto item = + std::find_if(instance_changes.cbegin(), instance_changes.cend(), + [change](const DataReaderCacheChange& v) + { + return v == change; + }); + ReadTakeCommand::generate_info(info, it->second, *item); mp_reader->change_read_by_user(change, wp, false); return true; } @@ -367,38 +347,50 @@ bool DataReaderHistory::get_first_untaken_info( } bool DataReaderHistory::find_key( - CacheChange_t* a_change, - t_m_Inst_Caches::iterator* vit_out) + const InstanceHandle_t& handle, + InstanceCollection::iterator& vit_out) { - t_m_Inst_Caches::iterator vit; - vit = keyed_changes_.find(a_change->instanceHandle); + InstanceCollection::iterator vit; + vit = keyed_changes_.find(handle); if (vit != keyed_changes_.end()) { - *vit_out = vit; + vit_out = vit; return true; } if (keyed_changes_.size() < static_cast(resource_limited_qos_.max_instances)) { - *vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first; + vit_out = keyed_changes_.emplace(handle, + DataReaderInstance{key_changes_allocation_, key_writers_allocation_}).first; return true; } - else + + for (vit = keyed_changes_.begin(); vit != keyed_changes_.end(); ++vit) { - for (vit = keyed_changes_.begin(); vit != keyed_changes_.end(); ++vit) + if (vit->second.cache_changes.size() == 0) { - if (vit->second.cache_changes.size() == 0) - { - keyed_changes_.erase(vit); - *vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first; - return true; - } + keyed_changes_.erase(vit); + vit_out = keyed_changes_.emplace(handle, + DataReaderInstance{ key_changes_allocation_, key_writers_allocation_ }).first; + return true; } - logWarning(SUBSCRIBER, "History has reached the maximum number of instances"); } + + logWarning(SUBSCRIBER, "History has reached the maximum number of instances"); return false; } +void DataReaderHistory::writer_unmatched( + const GUID_t& writer_guid, + const SequenceNumber_t& last_notified_seq) +{ + remove_changes_with_pred( + [&writer_guid, &last_notified_seq](CacheChange_t* ch) + { + return (writer_guid == ch->writerGUID) && (last_notified_seq < ch->sequenceNumber); + }); +} + bool DataReaderHistory::remove_change_sub( CacheChange_t* change) { @@ -409,26 +401,24 @@ bool DataReaderHistory::remove_change_sub( } std::lock_guard guard(*mp_mutex); - if (has_keys_) + bool found = false; + InstanceCollection::iterator vit; + if (find_key(change->instanceHandle, vit)) { - bool found = false; - t_m_Inst_Caches::iterator vit; - if (find_key(change, &vit)) + for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) { - for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) + if ((*chit)->sequenceNumber == change->sequenceNumber && + (*chit)->writerGUID == change->writerGUID) { - if ((*chit)->sequenceNumber == change->sequenceNumber && (*chit)->writerGUID == change->writerGUID) - { - vit->second.cache_changes.erase(chit); - found = true; - break; - } + vit->second.cache_changes.erase(chit); + found = true; + break; } } - if (!found) - { - logError(SUBSCRIBER, "Change not found on this key, something is wrong"); - } + } + if (!found) + { + logError(SUBSCRIBER, "Change not found on this key, something is wrong"); } if (remove_change(change)) @@ -442,7 +432,7 @@ bool DataReaderHistory::remove_change_sub( bool DataReaderHistory::remove_change_sub( CacheChange_t* change, - iterator& it) + DataReaderInstance::ChangeCollection::iterator& it) { if (mp_reader == nullptr || mp_mutex == nullptr) { @@ -451,27 +441,25 @@ bool DataReaderHistory::remove_change_sub( } std::lock_guard guard(*mp_mutex); - if (has_keys_) + bool found = false; + InstanceCollection::iterator vit; + if (find_key(change->instanceHandle, vit)) { - bool found = false; - t_m_Inst_Caches::iterator vit; - if (find_key(change, &vit)) + for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) { - for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) + if ((*chit)->sequenceNumber == change->sequenceNumber && + (*chit)->writerGUID == change->writerGUID) { - if ((*chit)->sequenceNumber == change->sequenceNumber && (*chit)->writerGUID == change->writerGUID) - { - assert(it == chit); - it = vit->second.cache_changes.erase(chit); - found = true; - break; - } + assert(it == chit); + it = vit->second.cache_changes.erase(chit); + found = true; + break; } } - if (!found) - { - logError(SUBSCRIBER, "Change not found on this key, something is wrong"); - } + } + if (!found) + { + logError(SUBSCRIBER, "Change not found on this key, something is wrong"); } const_iterator chit = find_change_nts(change); @@ -482,12 +470,7 @@ bool DataReaderHistory::remove_change_sub( } m_isHistoryFull = false; - iterator ret_it = remove_change_nts(chit); - - if (!has_keys_) - { - it = ret_it; - } + ReaderHistory::remove_change_nts(chit); return true; } @@ -502,19 +485,13 @@ bool DataReaderHistory::set_next_deadline( return false; } std::lock_guard guard(*mp_mutex); - - if (!has_keys_) - { - next_deadline_us_ = next_deadline_us; - return true; - } - - if (keyed_changes_.find(handle) == keyed_changes_.end()) + auto it = keyed_changes_.find(handle); + if (it == keyed_changes_.end()) { return false; } - keyed_changes_[handle].next_deadline_us = next_deadline_us; + it->second.next_deadline_us = next_deadline_us; return true; } @@ -528,18 +505,11 @@ bool DataReaderHistory::get_next_deadline( return false; } std::lock_guard guard(*mp_mutex); - - if (!has_keys_) - { - next_deadline_us = next_deadline_us_; - return true; - } - auto min = std::min_element(keyed_changes_.begin(), keyed_changes_.end(), []( - const std::pair& lhs, - const std::pair& rhs) + const std::pair& lhs, + const std::pair& rhs) { return lhs.second.next_deadline_us < rhs.second.next_deadline_us; }); @@ -573,10 +543,10 @@ std::pair DataReaderHistory::lookup_inst // Looking for the first instance, return the ficticious one containing all changes InstanceHandle_t tmp; tmp.value[0] = 1; - return { true, {tmp, &m_changes} }; + return { true, {tmp, &keyed_changes_.begin()->second} }; } - t_m_Inst_Caches::iterator it; + InstanceCollection::iterator it; if (exact) { @@ -584,7 +554,7 @@ std::pair DataReaderHistory::lookup_inst } else { - auto comp = [](const InstanceHandle_t& h, const std::pair& it) + auto comp = [](const InstanceHandle_t& h, const std::pair& it) { return h < it.first; }; @@ -593,35 +563,144 @@ std::pair DataReaderHistory::lookup_inst if (it != keyed_changes_.end()) { - return { true, {it->first, &(it->second.cache_changes)} }; + return { true, {it->first, &(it->second)} }; } return { false, {InstanceHandle_t(), nullptr} }; } +void DataReaderHistory::check_and_remove_instance( + DataReaderHistory::instance_info& instance_info) +{ + DataReaderInstance* instance = instance_info.second; + if (instance->cache_changes.empty() && + (InstanceStateKind::ALIVE_INSTANCE_STATE != instance->instance_state) && + instance_info.first.isDefined()) + { + keyed_changes_.erase(instance_info.first); + instance_info.second = nullptr; + } +} + ReaderHistory::iterator DataReaderHistory::remove_change_nts( ReaderHistory::const_iterator removal, bool release) { - CacheChange_t* p_sample = nullptr; - - if ( removal != changesEnd() - && (p_sample = *removal)->instanceHandle.isDefined() - && has_keys_) + if (removal != changesEnd()) { - // clean any references to this CacheChange in the key state collection - auto it = keyed_changes_.find(p_sample->instanceHandle); + CacheChange_t* p_sample = *removal; - // if keyed and in history must be in the map - assert(it != keyed_changes_.end()); + if (!has_keys_ || p_sample->is_fully_assembled()) + { + // clean any references to this CacheChange in the key state collection + auto it = keyed_changes_.find(p_sample->instanceHandle); + + // if keyed and in history must be in the map + assert(it != keyed_changes_.end()); - auto& c = it->second.cache_changes; - c.erase(std::remove(c.begin(), c.end(), p_sample), c.end()); + auto& c = it->second.cache_changes; + c.erase(std::remove(c.begin(), c.end(), p_sample), c.end()); + } } // call the base class return ReaderHistory::remove_change_nts(removal, release); } +bool DataReaderHistory::completed_change( + CacheChange_t* change) +{ + bool ret_value = true; + + if (!change->instanceHandle.isDefined()) + { + InstanceCollection::iterator vit; + ret_value = compute_key_for_change_fn_(change) && find_key(change->instanceHandle, vit); + if (ret_value) + { + ret_value = !change->instanceHandle.isDefined() || complete_fn_(change, vit->second); + } + + if (!ret_value) + { + const_iterator chit = find_change_nts(change); + if (chit != changesEnd()) + { + m_isHistoryFull = false; + remove_change_nts(chit); + } + else + { + logError(SUBSCRIBER, "Change should exist but didn't find it"); + } + } + } + + return ret_value; +} + +bool DataReaderHistory::completed_change_keep_all( + CacheChange_t* change, + DataReaderInstance& instance) +{ + DataReaderInstance::ChangeCollection& instance_changes = instance.cache_changes; + if (instance_changes.size() < static_cast(resource_limited_qos_.max_samples_per_instance)) + { + add_to_instance(change, instance); + return true; + } + + logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance"); + return false; +} + +bool DataReaderHistory::completed_change_keep_last( + CacheChange_t* change, + DataReaderInstance& instance) +{ + bool add = false; + DataReaderInstance::ChangeCollection& instance_changes = instance.cache_changes; + if (instance_changes.size() < static_cast(history_qos_.depth)) + { + add = true; + } + else + { + // Try to substitute the oldest sample. + + // As the instance should be ordered following the presentation QoS, we can always remove the first one. + add = remove_change_sub(instance_changes.at(0)); + } + + if (add) + { + add_to_instance(change, instance); + return true; + } + + return false; +} + +void DataReaderHistory::update_instance_nts( + CacheChange_t* const change) +{ + InstanceCollection::iterator vit; + vit = keyed_changes_.find(change->instanceHandle); + + assert(vit != keyed_changes_.end()); + vit->second.update_state(change->kind, change->writerGUID); + change->reader_info.disposed_generation_count = vit->second.disposed_generation_count; + change->reader_info.no_writers_generation_count = vit->second.no_writers_generation_count; +} + +void DataReaderHistory::writer_not_alive( + const fastrtps::rtps::GUID_t& writer_guid) +{ + for (auto& it : keyed_changes_) + { + it.second.writer_removed(writer_guid); + } +} + } // namespace detail } // namsepace dds } // namespace fastdds diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp index 236ba1f767b..629ce5f8f1f 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include @@ -34,12 +33,16 @@ #include #include +#include #include +#include #include #include -#include #include +#include + +#include "DataReaderInstance.hpp" namespace eprosima { namespace fastdds { @@ -56,16 +59,16 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory using MemoryManagementPolicy_t = eprosima::fastrtps::rtps::MemoryManagementPolicy_t; using InstanceHandle_t = eprosima::fastrtps::rtps::InstanceHandle_t; using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t; + using GUID_t = eprosima::fastrtps::rtps::GUID_t; + using SequenceNumber_t = eprosima::fastrtps::rtps::SequenceNumber_t; - using instance_info = std::pair*>; + using instance_info = std::pair; /** - * Constructor. Requires information about the subscriber. - * @param topic_att TopicAttributes. - * @param type TopicDataType. - * @param qos ReaderQoS policy. - * @param payloadMax Maximum payload size per change. - * @param mempolicy Set whether the payloads ccan dynamically resized or not. + * Constructor. Requires information about the DataReader. + * @param type Type information. Needed to know if the type is keyed, as long as the maximum serialized size. + * @param topic Topic description. Topic and type name are used on debug messages. + * @param qos DataReaderQoS policy. History related limits are taken from here. */ DataReaderHistory( const TypeSupport& type, @@ -85,6 +88,26 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory const_iterator removal, bool release = true) override; + /** + * Check if a new change can be added to this history. + * + * @param [in] writer_guid GUID of the writer where the change came from. + * @param [in] total_payload_size Total payload size of the incoming change. + * @param [in] unknown_missing_changes_up_to The number of changes from the same writer with a lower sequence + * number that could potentially be received in the future. + * @param [out] will_never_be_accepted When the method returns @c false, this parameter will inform + * whether the change could be accepted in the future or not. + * + * @pre change should not be present in the history + * + * @return Whether a call to received_change will succeed when called with the same arguments. + */ + bool can_change_be_added_nts( + const GUID_t& writer_guid, + uint32_t total_payload_size, + size_t unknown_missing_changes_up_to, + bool& will_never_be_accepted) const override; + /** * Called when a change is received by the Subscriber. Will add the change to the history. * @pre Change should not be already present in the history. @@ -96,6 +119,15 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory CacheChange_t* change, size_t unknown_missing_changes_up_to) override; + /** + * Called when a fragmented change is received completely by the Subscriber. Will find its instance and store it. + * @pre Change should be already present in the history. + * @param[in] change The received change + * @return + */ + bool completed_change( + CacheChange_t* change) override; + /** * @brief Returns information about the first untaken sample. * @param [out] info SampleInfo structure to store first untaken sample information. @@ -120,7 +152,20 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory */ bool remove_change_sub( CacheChange_t* change, - iterator& it); + DataReaderInstance::ChangeCollection::iterator& it); + + /** + * Called when a writer is unmatched from the reader holding this history. + * + * This method will remove all the changes on the history that came from the writer being unmatched and which have + * not yet been notified to the user. + * + * @param writer_guid GUID of the writer being unmatched. + * @param last_notified_seq Last sequence number from the specified writer that was notified to the user. + */ + void writer_unmatched( + const GUID_t& writer_guid, + const SequenceNumber_t& last_notified_seq) override; /** * @brief A method to set the next deadline for the given instance @@ -151,8 +196,7 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory * - @c first is a boolean indicating if an instance was found * - @c second is a pair where: * - @c first is the handle of the returned instance - * - @c second is a pointer to a std::vector with the list of changes for the - * returned instance + * - @c second is a pointer to a DataReaderInstance that holds information about the returned instance * * @remarks When used on a NO_KEY topic, an instance will only be returned when called with * `handle = HANDLE_NIL` and `exact = false`. @@ -161,14 +205,25 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory const InstanceHandle_t& handle, bool exact); + void update_instance_nts( + CacheChange_t* const change); + + void writer_not_alive( + const fastrtps::rtps::GUID_t& writer_guid); + + void check_and_remove_instance( + instance_info& instance_info); + private: - using t_m_Inst_Caches = std::map; + using InstanceCollection = std::map; + //!Resource limits for allocating the array of changes per instance + eprosima::fastrtps::ResourceLimitedContainerConfig key_changes_allocation_; + //!Resource limits for allocating the array of alive writers per instance + eprosima::fastrtps::ResourceLimitedContainerConfig key_writers_allocation_; //!Map where keys are instance handles and values vectors of cache changes - t_m_Inst_Caches keyed_changes_; - //!Time point when the next deadline will occur (only used for topics with no key) - std::chrono::steady_clock::time_point next_deadline_us_; + InstanceCollection keyed_changes_; //!HistoryQosPolicy values. HistoryQosPolicy history_qos_; //!ResourceLimitsQosPolicy values. @@ -187,6 +242,10 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory /// Function processing a received change std::function receive_fn_; + /// Function to compute the instance handle of a received change + std::function compute_key_for_change_fn_; + /// Function processing a completed fragmented change + std::function complete_fn_; /** * @brief Method that finds a key in m_keyedChanges or tries to add it if not found @@ -195,18 +254,8 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory * @return True if it was found or could be added to the map */ bool find_key( - CacheChange_t* a_change, - t_m_Inst_Caches::iterator* map_it); - - /** - * @brief Method that finds a key in m_keyedChanges or tries to add it if not found - * @param a_change The change to get the key from - * @param map_it A map iterator to the given key - * @return True if it was found or could be added to the map - */ - bool find_key_for_change( - CacheChange_t* a_change, - t_m_Inst_Caches::iterator& map_it); + const InstanceHandle_t& handle, + InstanceCollection::iterator& map_it); /** * @name Variants of incoming change processing. @@ -216,29 +265,44 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory * @return */ ///@{ - bool received_change_keep_all_no_key( + bool received_change_keep_all( CacheChange_t* change, size_t unknown_missing_changes_up_to); - bool received_change_keep_last_no_key( + bool received_change_keep_last( CacheChange_t* change, size_t unknown_missing_changes_up_to); + ///@} - bool received_change_keep_all_with_key( + /** + * @name Variants of change reconstruction completion processing. + * Will be called with the history mutex taken. + * @param change The change for which the last missing fragment has been processed. + * @param instance Instance where the change should be added. + * @return true when the change was added to the instance. + * @return false when the change could not be added to the instance and has been removed from the history. + */ + ///@{ + bool completed_change_keep_all( CacheChange_t* change, - size_t unknown_missing_changes_up_to); + DataReaderInstance& instance); - bool received_change_keep_last_with_key( + bool completed_change_keep_last( CacheChange_t* change, - size_t unknown_missing_changes_up_to); + DataReaderInstance& instance); ///@} - bool add_received_change( + bool add_received_change_with_key( + CacheChange_t* a_change, + DataReaderInstance& instance); + + bool add_to_reader_history_if_not_full( CacheChange_t* a_change); - bool add_received_change_with_key( + void add_to_instance( CacheChange_t* a_change, - std::vector& instance_changes); + DataReaderInstance& instance); + }; } // namespace detail diff --git a/src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp b/src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp new file mode 100644 index 00000000000..ed5a4e267ac --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp @@ -0,0 +1,216 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderInstance.hpp + */ + +#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERINSTANCE_HPP_ +#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERINSTANCE_HPP_ + +#include +#include + +#include +#include + +#include + +#include "DataReaderCacheChange.hpp" + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +/// Book-keeping information for an instance +struct DataReaderInstance +{ + using ChangeCollection = eprosima::fastrtps::ResourceLimitedVector; + using WriterOwnership = std::pair; + using WriterCollection = eprosima::fastrtps::ResourceLimitedVector; + + //! A vector of DataReader changes belonging to the same instance + ChangeCollection cache_changes; + //! The list of alive writers for this instance + WriterCollection alive_writers; + //! GUID and strength of the current maximum strength writer + WriterOwnership current_owner{ {}, 0 }; + //! The time when the group will miss the deadline + std::chrono::steady_clock::time_point next_deadline_us; + //! Current view state of the instance + ViewStateKind view_state = ViewStateKind::NEW_VIEW_STATE; + //! Current instance state of the instance + InstanceStateKind instance_state = InstanceStateKind::ALIVE_INSTANCE_STATE; + //! Current disposed generation of the instance + int32_t disposed_generation_count = 0; + //! Current no_writers generation of the instance + int32_t no_writers_generation_count = 0; + + DataReaderInstance( + const eprosima::fastrtps::ResourceLimitedContainerConfig& changes_allocation, + const eprosima::fastrtps::ResourceLimitedContainerConfig& writers_allocation) + : cache_changes(changes_allocation) + , alive_writers(writers_allocation) + { + } + + bool update_state( + const fastrtps::rtps::ChangeKind_t change_kind, + const fastrtps::rtps::GUID_t& writer_guid, + const uint32_t ownership_strength = 0) + { + bool ret_val = false; + + switch (change_kind) + { + case fastrtps::rtps::ALIVE: + ret_val = writer_alive(writer_guid, ownership_strength); + break; + + case fastrtps::rtps::NOT_ALIVE_DISPOSED: + ret_val = writer_dispose(writer_guid, ownership_strength); + break; + + case fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED: + ret_val = writer_dispose(writer_guid, ownership_strength); + ret_val |= writer_unregister(writer_guid); + break; + + case fastrtps::rtps::NOT_ALIVE_UNREGISTERED: + ret_val = writer_unregister(writer_guid); + break; + + default: + // TODO (Miguel C): log error / assert + break; + } + + return ret_val; + } + + bool writer_removed( + const fastrtps::rtps::GUID_t& writer_guid) + { + return writer_unregister(writer_guid); + } + +private: + + bool writer_alive( + const fastrtps::rtps::GUID_t& writer_guid, + const uint32_t ownership_strength) + { + bool ret_val = false; + + if (ownership_strength >= current_owner.second) + { + current_owner.first = writer_guid; + current_owner.second = ownership_strength; + + if (InstanceStateKind::NOT_ALIVE_DISPOSED_INSTANCE_STATE == instance_state) + { + ++disposed_generation_count; + alive_writers.clear(); + view_state = ViewStateKind::NEW_VIEW_STATE; + ret_val = true; + } + else if (InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE == instance_state) + { + ++no_writers_generation_count; + alive_writers.clear(); + view_state = ViewStateKind::NEW_VIEW_STATE; + ret_val = true; + } + + writer_set(writer_guid, ownership_strength); + instance_state = InstanceStateKind::ALIVE_INSTANCE_STATE; + } + + return ret_val; + } + + bool writer_dispose( + const fastrtps::rtps::GUID_t& writer_guid, + const uint32_t ownership_strength) + { + bool ret_val = false; + + writer_set(writer_guid, ownership_strength); + if (ownership_strength >= current_owner.second) + { + current_owner.first = writer_guid; + current_owner.second = ownership_strength; + + if (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state) + { + ret_val = true; + instance_state = InstanceStateKind::NOT_ALIVE_DISPOSED_INSTANCE_STATE; + } + } + + return ret_val; + } + + bool writer_unregister( + const fastrtps::rtps::GUID_t& writer_guid) + { + bool ret_val = false; + + alive_writers.remove_if([&writer_guid](const WriterOwnership& item) + { + return item.first == writer_guid; + }); + + if (writer_guid == current_owner.first) + { + current_owner.second = 0; + current_owner.first = fastrtps::rtps::c_Guid_Unknown; + } + + if (alive_writers.empty() && (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state)) + { + ret_val = true; + instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE; + } + + return ret_val; + } + + void writer_set( + const fastrtps::rtps::GUID_t& writer_guid, + const uint32_t ownership_strength) + { + auto it = std::find_if(alive_writers.begin(), alive_writers.end(), [&writer_guid](const WriterOwnership& item) + { + return item.first == writer_guid; + }); + if (it == alive_writers.end()) + { + alive_writers.emplace_back(writer_guid, ownership_strength); + } + else + { + it->second = ownership_strength; + } + } + +}; + +} /* namespace detail */ +} /* namespace dds */ +} /* namespace fastdds */ +} /* namespace eprosima */ + +#endif // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_ diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp index 9de26f80dd5..39a7b4542d6 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp @@ -693,58 +693,6 @@ bool SubscriberHistory::get_next_deadline( return false; } -std::pair SubscriberHistory::lookup_instance( - const InstanceHandle_t& handle, - bool exact) -{ - if (topic_att_.getTopicKind() == NO_KEY) - { - if (handle.isDefined()) - { - // NO_KEY topics can only return the ficticious instance. - // Execution can only get here for two reasons: - // - Looking for a specific instance (exact = true) - // - Looking for the next instance to the ficticious one (exact = false) - // In both cases, no instance should be returned - return { false, {InstanceHandle_t(), nullptr} }; - } - else - { - if (exact) - { - // Looking for HANDLE_NIL, nothing to return - return { false, {InstanceHandle_t(), nullptr} }; - } - - // Looking for the first instance, return the ficticious one containing all changes - InstanceHandle_t tmp; - tmp.value[0] = 1; - return { true, {tmp, &m_changes} }; - } - } - - t_m_Inst_Caches::iterator it; - - if (exact) - { - it = keyed_changes_.find(handle); - } - else - { - auto comp = [](const InstanceHandle_t& h, const std::pair& it) - { - return h < it.first; - }; - it = std::upper_bound(keyed_changes_.begin(), keyed_changes_.end(), handle, comp); - } - - if (it != keyed_changes_.end()) - { - return { true, {it->first, &(it->second.cache_changes)} }; - } - return { false, {InstanceHandle_t(), nullptr} }; -} - ReaderHistory::iterator SubscriberHistory::remove_change_nts( ReaderHistory::const_iterator removal, bool release) diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index 6c4050d8e4a..a7baba15b44 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -158,38 +158,33 @@ History::iterator ReaderHistory::remove_change_nts( return ret_val; } +void ReaderHistory::writer_unmatched( + const GUID_t& writer_guid, + const SequenceNumber_t& last_notified_seq) +{ + static_cast(last_notified_seq); + remove_changes_with_pred( + [&writer_guid](CacheChange_t* ch) + { + return writer_guid == ch->writerGUID; + }); +} + bool ReaderHistory::remove_changes_with_guid( const GUID_t& a_guid) { - std::vector changes_to_remove; - if (mp_reader == nullptr || mp_mutex == nullptr) { logError(RTPS_READER_HISTORY, "You need to create a Reader with History before removing any changes"); return false; } - { - //Lock scope - std::lock_guard guard(*mp_mutex); - for (std::vector::iterator chit = m_changes.begin(); chit != m_changes.end(); ++chit) + remove_changes_with_pred( + [a_guid](CacheChange_t* ch) { - if ((*chit)->writerGUID == a_guid) - { - changes_to_remove.push_back((*chit)); - } - } - }//End lock scope + return a_guid == ch->writerGUID; + }); - for (std::vector::iterator chit = changes_to_remove.begin(); chit != changes_to_remove.end(); - ++chit) - { - if (!remove_change(*chit)) - { - logError(RTPS_READER_HISTORY, "One of the cachechanged in the GUID removal bulk could not be removed"); - return false; - } - } return true; } diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 5d08dd09b66..8f0289b1e02 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -285,7 +285,7 @@ bool StatefulReader::matched_writer_remove( if (is_alive_) { //Remove cachechanges belonging to the unmatched writer - mp_history->remove_changes_with_guid(writer_guid); + mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid)); if (liveliness_lease_duration_ < c_TimeInfinite) { @@ -789,34 +789,23 @@ bool StatefulReader::change_removed_by_history( { std::lock_guard guard(mp_mutex); - if (is_alive_) + if (is_alive_ && a_change->is_fully_assembled()) { if (wp != nullptr || matched_writer_lookup(a_change->writerGUID, &wp)) { - if (a_change->is_fully_assembled()) - { - if (!a_change->isRead && wp->available_changes_max() >= a_change->sequenceNumber) - { - if (0 < total_unread_) - { - --total_unread_; - } - } - - - wp->change_removed_from_history(a_change->sequenceNumber); - } - return true; + wp->change_removed_from_history(a_change->sequenceNumber); } - else + + if (!a_change->isRead && + get_last_notified(a_change->writerGUID) >= a_change->sequenceNumber) { - if (a_change->writerGUID.entityId != m_trustedWriterEntityId) + if (0 < total_unread_) { - // trusted entities messages mean no havoc - logError(RTPS_READER, - " You should always find the WP associated with a change, something is very wrong"); + --total_unread_; } + } + return true; } //Simulate a datasharing notification to process any pending payloads that were waiting due to full history @@ -1143,16 +1132,14 @@ bool StatefulReader::begin_sample_access_nts( const GUID_t& writer_guid = change->writerGUID; is_future_change = false; - if (!matched_writer_lookup(writer_guid, &wp)) - { - return false; - } - - SequenceNumber_t seq; - seq = wp->available_changes_max(); - if (seq < change->sequenceNumber) + if (matched_writer_lookup(writer_guid, &wp)) { - is_future_change = true; + SequenceNumber_t seq; + seq = wp->available_changes_max(); + if (seq < change->sequenceNumber) + { + is_future_change = true; + } } return true; @@ -1171,8 +1158,7 @@ void StatefulReader::change_read_by_user( WriterProxy* writer, bool mark_as_read) { - assert(writer != nullptr); - assert(change->writerGUID == writer->guid()); + assert(!writer || change->writerGUID == writer->guid()); // Mark change as read if (mark_as_read && !change->isRead) @@ -1185,7 +1171,7 @@ void StatefulReader::change_read_by_user( } // If not datasharing, we are done - if (!writer->is_datasharing_writer()) + if (!writer || !writer->is_datasharing_writer()) { return; } diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 738ba165d83..7f35974be18 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -178,7 +178,7 @@ bool StatelessReader::matched_writer_remove( std::lock_guard guard(mp_mutex); //Remove cachechanges belonging to the unmatched writer - mp_history->remove_changes_with_guid(writer_guid); + mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid)); if (liveliness_lease_duration_ < c_TimeInfinite) { diff --git a/src/cpp/utils/SystemInfo.cpp b/src/cpp/utils/SystemInfo.cpp index fb98234b3a1..a860540a96d 100644 --- a/src/cpp/utils/SystemInfo.cpp +++ b/src/cpp/utils/SystemInfo.cpp @@ -161,10 +161,11 @@ FileWatchHandle SystemInfo::watch_file( break; } })); -#endif // defined(_WIN32) || defined(__unix__) +#else // defined(_WIN32) || defined(__unix__) static_cast(filename); static_cast(callback); return FileWatchHandle(); +#endif // defined(_WIN32) || defined(__unix__) } void SystemInfo::stop_watching_file( diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index c2df5d695b5..0a27fbdbb40 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -1571,8 +1571,9 @@ class PubSubReader std::unique_lock lock(mutex_); // Check order of changes. - ASSERT_LT(last_seq[info.instance_handle], info.sample_identity.sequence_number()); - last_seq[info.instance_handle] = info.sample_identity.sequence_number(); + LastSeqInfo seq_info{ info.instance_handle, info.sample_identity.writer_guid() }; + ASSERT_LT(last_seq[seq_info], info.sample_identity.sequence_number()); + last_seq[seq_info] = info.sample_identity.sequence_number(); if (info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE) { @@ -1657,7 +1658,8 @@ class PubSubReader unsigned int participant_matched_; std::atomic receiving_; eprosima::fastdds::dds::TypeSupport type_; - std::map last_seq; + using LastSeqInfo = std::pair; + std::map last_seq; size_t current_processed_count_; size_t number_samples_expected_; bool discovery_result_; diff --git a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h index 932e4c58de9..840a9e1bce1 100644 --- a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h +++ b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h @@ -133,10 +133,22 @@ class ReaderHistory return m_changes.erase(removal); } + virtual void writer_unmatched( + const GUID_t& /*writer_guid*/, + const SequenceNumber_t& /*last_notified_seq*/) + { + } + HistoryAttributes m_att; protected: + template + inline void remove_changes_with_pred( + Pred) + { + } + RTPSReader* mp_reader; RecursiveTimedMutex* mp_mutex; std::vector m_changes; diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index 384839bb9f5..166f2ac614c 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -1527,6 +1527,7 @@ TEST_F(DataReaderTests, sample_info) writer_qos_.resource_limits().max_instances = 2; writer_qos_.resource_limits().max_samples_per_instance = 1; writer_qos_.resource_limits().max_samples = 2; + writer_qos_.writer_data_lifecycle().autodispose_unregistered_instances = false; data_[0].index(1); data_[1].index(2); @@ -2104,13 +2105,17 @@ TEST_F(DataReaderTests, check_key_history_wholesomeness_on_unmatch) SampleInfoSeq infos; res = data_reader_->take_instance(samples, infos, LENGTH_UNLIMITED, handle_ok_); + + // If the DataWriter is destroyed only the non-notified samples must be removed + // this operation MUST succeed + ASSERT_EQ(res, ReturnCode_t::RETCODE_OK); + + data_reader_->return_loan(samples, infos); }); // Check if the thread hangs // wait for termination std::this_thread::sleep_for(std::chrono::milliseconds(500)); - // check expected result, if query thread hangs res = ReturnCode_t::RETCODE_OK - ASSERT_NE(res, ReturnCode_t::RETCODE_OK); query.join(); } From 0c9a2ea3c4963f6526a49dd03fa43a0c4a983e64 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 Dec 2021 10:50:47 +0100 Subject: [PATCH 05/10] Refs 12400. Adding checks for insert on ResourceLimitedVectorTests. Signed-off-by: Miguel Company --- .../utils/ResourceLimitedVectorTests.cpp | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/test/unittest/utils/ResourceLimitedVectorTests.cpp b/test/unittest/utils/ResourceLimitedVectorTests.cpp index 60bf89b7eb5..f4e75bcb592 100644 --- a/test/unittest/utils/ResourceLimitedVectorTests.cpp +++ b/test/unittest/utils/ResourceLimitedVectorTests.cpp @@ -59,6 +59,26 @@ TEST_F(ResourceLimitedVectorTests, default_constructor) // Should be empty ASSERT_TRUE(uut.empty()); ASSERT_EQ(uut.capacity(), NUM_ITEMS); + + // Add all items using insert + auto it = uut.cbegin(); + for (int i : testbed) + { + it = uut.insert(it, i); + ASSERT_NE(it, uut.cend()); + it = uut.insert(it, std::move(i)); + ASSERT_NE(it, uut.cend()); + } + + // Vector should be filled + ASSERT_EQ(uut.size(), 2*NUM_ITEMS); + ASSERT_EQ(uut.capacity(), 2*NUM_ITEMS); + + uut.clear(); + + // Should be empty but allocated + ASSERT_TRUE(uut.empty()); + ASSERT_EQ(uut.capacity(), 2*NUM_ITEMS); } TEST_F(ResourceLimitedVectorTests, static_config) From 237fd2b3728b116eaa1d40cde7ae087e59732234 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 Dec 2021 10:51:30 +0100 Subject: [PATCH 06/10] Refs 12400. Fixed rvalue version of ResourceLimitedVector::insert. Signed-off-by: Miguel Company --- include/fastrtps/utils/collections/ResourceLimitedVector.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/include/fastrtps/utils/collections/ResourceLimitedVector.hpp b/include/fastrtps/utils/collections/ResourceLimitedVector.hpp index 56ee45e673b..cafe1291053 100644 --- a/include/fastrtps/utils/collections/ResourceLimitedVector.hpp +++ b/include/fastrtps/utils/collections/ResourceLimitedVector.hpp @@ -152,12 +152,13 @@ class ResourceLimitedVector const_iterator pos, value_type&& value) { + auto dist = std::distance(collection_.cbegin(), pos); if (!ensure_capacity()) { return end(); } - return collection_.insert(pos, std::move(value)); + return collection_.insert(collection_.cbegin() + dist, std::move(value)); } /** From 9c3b929f76fc9247768859b8a525395e310f26ff Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 Dec 2021 10:57:45 +0100 Subject: [PATCH 07/10] Refs 12400. Added feature to versions.md. Signed-off-by: Miguel Company --- versions.md | 1 + 1 file changed, 1 insertion(+) diff --git a/versions.md b/versions.md index 98da12cffff..74b5fc7a8d7 100644 --- a/versions.md +++ b/versions.md @@ -7,6 +7,7 @@ Forthcoming API, implies ABI break) * New DataWriter API allowing to wait for acknowledgements for a specific instance (extends DataWriter API, implies ABI break) +* Adding DataReader history with correct implementation of instance_state and view_state (ABI break on RTPS layer) * Version 2.4.0 From 27036c8f00923a80f00d1ba0794676d4e380e5d8 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 Dec 2021 11:38:56 +0100 Subject: [PATCH 08/10] Refs 12400. Linters. Signed-off-by: Miguel Company --- .../utils/ResourceLimitedVectorTests.cpp | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/test/unittest/utils/ResourceLimitedVectorTests.cpp b/test/unittest/utils/ResourceLimitedVectorTests.cpp index f4e75bcb592..98c2a581bad 100644 --- a/test/unittest/utils/ResourceLimitedVectorTests.cpp +++ b/test/unittest/utils/ResourceLimitedVectorTests.cpp @@ -21,14 +21,15 @@ using namespace eprosima::fastrtps; // some implementations of std::vector would enforce power of two capacities constexpr size_t NUM_ITEMS = 32; -class ResourceLimitedVectorTests: public ::testing::Test +class ResourceLimitedVectorTests : public ::testing::Test { - public: - const int testbed[NUM_ITEMS] = - { - 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, - 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32 - }; +public: + + const int testbed[NUM_ITEMS] = + { + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, + 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32 + }; }; TEST_F(ResourceLimitedVectorTests, default_constructor) @@ -71,14 +72,14 @@ TEST_F(ResourceLimitedVectorTests, default_constructor) } // Vector should be filled - ASSERT_EQ(uut.size(), 2*NUM_ITEMS); - ASSERT_EQ(uut.capacity(), 2*NUM_ITEMS); + ASSERT_EQ(uut.size(), 2 * NUM_ITEMS); + ASSERT_EQ(uut.capacity(), 2 * NUM_ITEMS); uut.clear(); // Should be empty but allocated ASSERT_TRUE(uut.empty()); - ASSERT_EQ(uut.capacity(), 2*NUM_ITEMS); + ASSERT_EQ(uut.capacity(), 2 * NUM_ITEMS); } TEST_F(ResourceLimitedVectorTests, static_config) @@ -123,7 +124,7 @@ TEST_F(ResourceLimitedVectorTests, static_config) TEST_F(ResourceLimitedVectorTests, prealocated_growing_1_config) { - ResourceLimitedVector uut(ResourceLimitedContainerConfig{ NUM_ITEMS, NUM_ITEMS*2, 1}); + ResourceLimitedVector uut(ResourceLimitedContainerConfig{ NUM_ITEMS, NUM_ITEMS * 2, 1}); ASSERT_TRUE(uut.empty()); @@ -150,8 +151,8 @@ TEST_F(ResourceLimitedVectorTests, prealocated_growing_1_config) } // Vector should be filled - ASSERT_EQ(uut.size(), NUM_ITEMS*2); - ASSERT_EQ(uut.capacity(), NUM_ITEMS*2); + ASSERT_EQ(uut.size(), NUM_ITEMS * 2); + ASSERT_EQ(uut.capacity(), NUM_ITEMS * 2); // Adding more values should return error for (int i : testbed) @@ -247,7 +248,10 @@ TEST_F(ResourceLimitedVectorTests, remove_if) ASSERT_EQ(uut.size(), NUM_ITEMS); ASSERT_EQ(uut.capacity(), NUM_ITEMS); - auto is_odd = [](int i) { return (i & 1) != 0; }; + auto is_odd = [](int i) + { + return (i & 1) != 0; + }; // Remove all odd items and check no errors for (size_t i = 0; i < NUM_ITEMS / 2; i++) @@ -267,7 +271,9 @@ TEST_F(ResourceLimitedVectorTests, remove_if) } -int main(int argc, char **argv) +int main( + int argc, + char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); From 942a56ad251d4c6ba0539ac5f4095744401ff02d Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 Dec 2021 12:56:05 +0100 Subject: [PATCH 09/10] Refs 12400. Fixed inclusion of DataReaderHistory.cpp on CMake files. Signed-off-by: Miguel Company --- src/cpp/CMakeLists.txt | 4 +--- test/unittest/dds/publisher/CMakeLists.txt | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index d4842b96978..aa55dd48713 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -85,10 +85,7 @@ set(${PROJECT_NAME}_source_files fastrtps_deprecated/subscriber/Subscriber.cpp fastrtps_deprecated/subscriber/SubscriberImpl.cpp fastrtps_deprecated/subscriber/SubscriberHistory.cpp - fastdds/subscriber/DataReader.cpp - fastdds/subscriber/history/DataReaderHistory.cpp fastdds/publisher/DataWriter.cpp - fastdds/subscriber/DataReaderImpl.cpp fastdds/publisher/DataWriterImpl.cpp fastdds/topic/Topic.cpp fastdds/topic/TopicImpl.cpp @@ -105,6 +102,7 @@ set(${PROJECT_NAME}_source_files fastdds/subscriber/Subscriber.cpp fastdds/subscriber/DataReader.cpp fastdds/subscriber/DataReaderImpl.cpp + fastdds/subscriber/history/DataReaderHistory.cpp fastdds/domain/DomainParticipantFactory.cpp fastdds/domain/DomainParticipantImpl.cpp fastdds/domain/DomainParticipant.cpp diff --git a/test/unittest/dds/publisher/CMakeLists.txt b/test/unittest/dds/publisher/CMakeLists.txt index 384441c717e..78999f9df59 100644 --- a/test/unittest/dds/publisher/CMakeLists.txt +++ b/test/unittest/dds/publisher/CMakeLists.txt @@ -75,6 +75,7 @@ set(DATAWRITERTESTS_SOURCE DataWriterTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/Subscriber.cpp From 0dec6a42e50d975c7cb96886ffc2677a77d35109 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 20 Dec 2021 14:55:29 +0100 Subject: [PATCH 10/10] Refs 12400. Add missing feature to versions.md. Signed-off-by: Miguel Company --- versions.md | 1 + 1 file changed, 1 insertion(+) diff --git a/versions.md b/versions.md index 74b5fc7a8d7..b9bec8a1995 100644 --- a/versions.md +++ b/versions.md @@ -7,6 +7,7 @@ Forthcoming API, implies ABI break) * New DataWriter API allowing to wait for acknowledgements for a specific instance (extends DataWriter API, implies ABI break) +* Generation of GUID on entity creation (ABI break on RTPS layer) * Adding DataReader history with correct implementation of instance_state and view_state (ABI break on RTPS layer) *