From 9c3a5a49480869b7cd850d596b5a763267cf1c8d Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 7 Sep 2021 07:20:07 +0200 Subject: [PATCH] Refactor DataReaderImpl to use a new DataReaderHistory class (#2177) * Refs 12400. Duplicating SubscriberHistory into DataReaderHistory. Signed-off-by: Miguel Company * Refs 12400. Using DataReaderHistory on DataReaderImpl. Signed-off-by: Miguel Company * Refs 12400. Avoid using TopicAttributes. Signed-off-by: Miguel Company * Refs 12400. Additional cleanup. Signed-off-by: Miguel Company * Refs 12400. Using DDS SampleInfo. Signed-off-by: Miguel Company * Refs 12404. Uncrustify. Signed-off-by: Miguel Company --- src/cpp/CMakeLists.txt | 1 + src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 44 +- src/cpp/fastdds/subscriber/DataReaderImpl.hpp | 5 +- .../DataReaderImpl/ReadTakeCommand.hpp | 4 +- .../subscriber/history/DataReaderHistory.cpp | 626 ++++++++++++++++++ .../subscriber/history/DataReaderHistory.hpp | 249 +++++++ test/unittest/dds/status/CMakeLists.txt | 1 + test/unittest/statistics/dds/CMakeLists.txt | 1 + 8 files changed, 885 insertions(+), 46 deletions(-) create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index 1ff26c41486..ef295bdca84 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -84,6 +84,7 @@ set(${PROJECT_NAME}_source_files fastrtps_deprecated/subscriber/SubscriberImpl.cpp fastrtps_deprecated/subscriber/SubscriberHistory.cpp fastdds/subscriber/DataReader.cpp + fastdds/subscriber/history/DataReaderHistory.cpp fastdds/publisher/DataWriter.cpp fastdds/subscriber/DataReaderImpl.cpp fastdds/publisher/DataWriterImpl.cpp diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 7a7d8194894..cde9c58585a 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -57,40 +57,6 @@ namespace eprosima { namespace fastdds { namespace dds { -static void sample_info_to_dds ( - const SampleInfo_t& rtps_info, - SampleInfo* dds_info) -{ - dds_info->sample_state = NOT_READ_SAMPLE_STATE; - dds_info->view_state = NOT_NEW_VIEW_STATE; - dds_info->disposed_generation_count = 0; - dds_info->no_writers_generation_count = 1; - dds_info->sample_rank = 0; - dds_info->generation_rank = 0; - dds_info->absoulte_generation_rank = 0; - dds_info->source_timestamp = rtps_info.sourceTimestamp; - dds_info->reception_timestamp = rtps_info.receptionTimestamp; - dds_info->instance_handle = rtps_info.iHandle; - dds_info->publication_handle = fastrtps::rtps::InstanceHandle_t(rtps_info.sample_identity.writer_guid()); - dds_info->sample_identity = rtps_info.sample_identity; - dds_info->related_sample_identity = rtps_info.related_sample_identity; - dds_info->valid_data = rtps_info.sampleKind == eprosima::fastrtps::rtps::ALIVE ? true : false; - - switch (rtps_info.sampleKind) - { - case eprosima::fastrtps::rtps::ALIVE: - dds_info->instance_state = ALIVE_INSTANCE_STATE; - break; - case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: - dds_info->instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE; - break; - default: - //TODO [ILG] change this if the other kinds ever get implemented - dds_info->instance_state = ALIVE_INSTANCE_STATE; - break; - } -} - static bool collections_have_same_properties( const LoanableCollection& data_values, const SampleInfoSeq& sample_infos) @@ -126,11 +92,7 @@ DataReaderImpl::DataReaderImpl( , topic_(topic) , qos_(&qos == &DATAREADER_QOS_DEFAULT ? subscriber_->get_default_datareader_qos() : qos) #pragma warning (disable : 4355 ) - , history_(topic_attributes(), - type_.get(), - qos_.get_readerqos(subscriber_->get_qos()), - type_->m_typeSize + 3, /* Possible alignment */ - qos_.endpoint().history_memory_policy) + , history_(type, *topic, qos_) , listener_(listener) , reader_listener_(this) , deadline_duration_us_(qos_.deadline().period.to_ns() * 1e-3) @@ -681,10 +643,8 @@ ReturnCode_t DataReaderImpl::get_first_untaken_info( return ReturnCode_t::RETCODE_NOT_ENABLED; } - SampleInfo_t rtps_info; - if (history_.get_first_untaken_info(&rtps_info)) + if (history_.get_first_untaken_info(*info)) { - sample_info_to_dds(rtps_info, info); return ReturnCode_t::RETCODE_OK; } return ReturnCode_t::RETCODE_NO_DATA; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index 4b493417926..55a31ff44c7 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -36,7 +36,6 @@ #include #include -#include #include #include @@ -46,6 +45,8 @@ #include #include +#include + using eprosima::fastrtps::types::ReturnCode_t; namespace eprosima { @@ -328,7 +329,7 @@ class DataReaderImpl DataReaderQos qos_; //!History - fastrtps::SubscriberHistory history_; + detail::DataReaderHistory history_; //!Listener DataReaderListener* listener_ = nullptr; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp index dbd4a79b6ea..54542d7f5ac 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp @@ -27,7 +27,6 @@ #include #include -#include #include #include @@ -35,6 +34,7 @@ #include #include #include +#include #include #include @@ -51,7 +51,7 @@ namespace detail { struct ReadTakeCommand { using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t; - using history_type = eprosima::fastrtps::SubscriberHistory; + using history_type = eprosima::fastdds::dds::detail::DataReaderHistory; using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t; using RTPSReader = eprosima::fastrtps::rtps::RTPSReader; using WriterProxy = eprosima::fastrtps::rtps::WriterProxy; diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp new file mode 100644 index 00000000000..bc56f611891 --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -0,0 +1,626 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderHistory.cpp + */ + +#include +#include + +#include "DataReaderHistory.hpp" + +#include +#include + +#include +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +using namespace eprosima::fastrtps::rtps; + +using eprosima::fastrtps::KeyedChanges; +using eprosima::fastrtps::RecursiveTimedMutex; + +static void get_sample_info( + SampleInfo& info, + CacheChange_t* change) +{ + info.sample_state = NOT_READ_SAMPLE_STATE; + info.view_state = NOT_NEW_VIEW_STATE; + info.disposed_generation_count = 0; + info.no_writers_generation_count = 1; + info.sample_rank = 0; + info.generation_rank = 0; + info.absoulte_generation_rank = 0; + info.source_timestamp = change->sourceTimestamp; + info.reception_timestamp = change->reader_info.receptionTimestamp; + info.instance_handle = change->instanceHandle; + info.publication_handle = fastrtps::rtps::InstanceHandle_t(change->writerGUID); + info.sample_identity.writer_guid(change->writerGUID); + info.sample_identity.sequence_number(change->sequenceNumber); + info.related_sample_identity = change->write_params.sample_identity(); + info.valid_data = change->kind == eprosima::fastrtps::rtps::ALIVE; + + switch (change->kind) + { + case eprosima::fastrtps::rtps::ALIVE: + info.instance_state = ALIVE_INSTANCE_STATE; + break; + case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: + info.instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE; + break; + default: + //TODO [ILG] change this if the other kinds ever get implemented + info.instance_state = ALIVE_INSTANCE_STATE; + break; + } +} + +static HistoryAttributes to_history_attributes( + const TypeSupport& type, + const DataReaderQos& qos) +{ + auto initial_samples = qos.resource_limits().allocated_samples; + auto max_samples = qos.resource_limits().max_samples; + + if (qos.history().kind != KEEP_ALL_HISTORY_QOS) + { + max_samples = qos.history().depth; + if (type->m_isGetKeyDefined) + { + max_samples *= qos.resource_limits().max_instances; + } + + initial_samples = std::min(initial_samples, max_samples); + } + + auto mempolicy = qos.endpoint().history_memory_policy; + auto payloadMaxSize = type->m_typeSize + 3; // possible alignment + + return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples); +} + +DataReaderHistory::DataReaderHistory( + const TypeSupport& type, + const TopicDescription& topic, + const DataReaderQos& qos) + : ReaderHistory(to_history_attributes(type, qos)) + , history_qos_(qos.history()) + , resource_limited_qos_(qos.resource_limits()) + , topic_name_(topic.get_name()) + , type_name_(topic.get_type_name()) + , has_keys_(type->m_isGetKeyDefined) + , type_(type.get()) + , get_key_object_(nullptr) +{ + if (type_->m_isGetKeyDefined) + { + get_key_object_ = type_->createData(); + } + + if (resource_limited_qos_.max_samples == 0) + { + resource_limited_qos_.max_samples = std::numeric_limits::max(); + } + + if (resource_limited_qos_.max_instances == 0) + { + resource_limited_qos_.max_instances = std::numeric_limits::max(); + } + + if (resource_limited_qos_.max_samples_per_instance == 0) + { + resource_limited_qos_.max_samples_per_instance = std::numeric_limits::max(); + } + + using std::placeholders::_1; + using std::placeholders::_2; + + if (!has_keys_) + { + receive_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? + std::bind(&DataReaderHistory::received_change_keep_all_no_key, this, _1, _2) : + std::bind(&DataReaderHistory::received_change_keep_last_no_key, this, _1, _2); + } + else + { + receive_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ? + std::bind(&DataReaderHistory::received_change_keep_all_with_key, this, _1, _2) : + std::bind(&DataReaderHistory::received_change_keep_last_with_key, this, _1, _2); + } +} + +DataReaderHistory::~DataReaderHistory() +{ + if (type_->m_isGetKeyDefined) + { + type_->deleteData(get_key_object_); + } +} + +bool DataReaderHistory::received_change( + CacheChange_t* a_change, + size_t unknown_missing_changes_up_to) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + + std::lock_guard guard(*mp_mutex); + return receive_fn_(a_change, unknown_missing_changes_up_to); +} + +bool DataReaderHistory::received_change_keep_all_no_key( + CacheChange_t* a_change, + size_t unknown_missing_changes_up_to) +{ + // TODO(Ricardo) Check + if (m_changes.size() + unknown_missing_changes_up_to < static_cast(resource_limited_qos_.max_samples)) + { + return add_received_change(a_change); + } + + return false; +} + +bool DataReaderHistory::received_change_keep_last_no_key( + CacheChange_t* a_change, + size_t /* unknown_missing_changes_up_to */ ) +{ + bool add = false; + if (m_changes.size() < static_cast(history_qos_.depth)) + { + add = true; + } + else + { + // Try to substitute the oldest sample. + + // As the history should be ordered following the presentation QoS, we can always remove the first one. + add = remove_change_sub(m_changes.at(0)); + } + + if (add) + { + return add_received_change(a_change); + } + + return false; +} + +bool DataReaderHistory::received_change_keep_all_with_key( + CacheChange_t* a_change, + size_t /* unknown_missing_changes_up_to */ ) +{ + // TODO(Miguel C): Should we check unknown_missing_changes_up_to as it is done in received_change_keep_all_no_key? + + t_m_Inst_Caches::iterator vit; + if (find_key_for_change(a_change, vit)) + { + std::vector& instance_changes = vit->second.cache_changes; + if (instance_changes.size() < static_cast(resource_limited_qos_.max_samples_per_instance)) + { + return add_received_change_with_key(a_change, vit->second.cache_changes); + } + + logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance"); + } + + return false; +} + +bool DataReaderHistory::received_change_keep_last_with_key( + CacheChange_t* a_change, + size_t /* unknown_missing_changes_up_to */) +{ + t_m_Inst_Caches::iterator vit; + if (find_key_for_change(a_change, vit)) + { + bool add = false; + std::vector& instance_changes = vit->second.cache_changes; + if (instance_changes.size() < static_cast(history_qos_.depth)) + { + add = true; + } + else + { + // Try to substitute the oldest sample. + + // As the instance should be ordered following the presentation QoS, we can always remove the first one. + add = remove_change_sub(instance_changes.at(0)); + } + + if (add) + { + return add_received_change_with_key(a_change, instance_changes); + } + } + + return false; +} + +bool DataReaderHistory::add_received_change( + CacheChange_t* a_change) +{ + if (m_isHistoryFull) + { + // Discarding the sample. + logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << type_name_); + return false; + } + + if (add_change(a_change)) + { + if (m_changes.size() == static_cast(m_att.maximumReservedCaches)) + { + m_isHistoryFull = true; + } + + logInfo(SUBSCRIBER, type_name_ + << ": Change " << a_change->sequenceNumber << " added from: " + << a_change->writerGUID; ); + + return true; + } + + return false; +} + +bool DataReaderHistory::add_received_change_with_key( + CacheChange_t* a_change, + std::vector& instance_changes) +{ + if (m_isHistoryFull) + { + // Discarting the sample. + logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << type_name_); + return false; + } + + if (add_change(a_change)) + { + if (m_changes.size() == static_cast(m_att.maximumReservedCaches)) + { + m_isHistoryFull = true; + } + + //ADD TO KEY VECTOR + + // As the instance should be ordered following the presentation QoS, and + // we only support ordering by reception timestamp, we can always add at the end. + instance_changes.push_back(a_change); + + logInfo(SUBSCRIBER, mp_reader->getGuid().entityId + << ": Change " << a_change->sequenceNumber << " added from: " + << a_change->writerGUID << " with KEY: " << a_change->instanceHandle; ); + + return true; + } + + return false; +} + +bool DataReaderHistory::find_key_for_change( + CacheChange_t* a_change, + t_m_Inst_Caches::iterator& map_it) +{ + if (!a_change->instanceHandle.isDefined() && type_ != nullptr) + { + logInfo(SUBSCRIBER, "Getting Key of change with no Key transmitted"); + type_->deserialize(&a_change->serializedPayload, get_key_object_); + bool is_key_protected = false; +#if HAVE_SECURITY + is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected; +#endif // if HAVE_SECURITY + if (!type_->getKey(get_key_object_, &a_change->instanceHandle, is_key_protected)) + { + return false; + } + } + else if (!a_change->instanceHandle.isDefined()) + { + logWarning(SUBSCRIBER, "NO KEY in topic: " << topic_name_ + << " and no method to obtain it"; ); + return false; + } + + return find_key(a_change, &map_it); +} + +bool DataReaderHistory::get_first_untaken_info( + SampleInfo& info) +{ + std::lock_guard lock(*mp_mutex); + + CacheChange_t* change = nullptr; + WriterProxy* wp = nullptr; + if (mp_reader->nextUntakenCache(&change, &wp)) + { + get_sample_info(info, change); + mp_reader->change_read_by_user(change, wp, false); + return true; + } + + return false; +} + +bool DataReaderHistory::find_key( + CacheChange_t* a_change, + t_m_Inst_Caches::iterator* vit_out) +{ + t_m_Inst_Caches::iterator vit; + vit = keyed_changes_.find(a_change->instanceHandle); + if (vit != keyed_changes_.end()) + { + *vit_out = vit; + return true; + } + + if (keyed_changes_.size() < static_cast(resource_limited_qos_.max_instances)) + { + *vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first; + return true; + } + else + { + for (vit = keyed_changes_.begin(); vit != keyed_changes_.end(); ++vit) + { + if (vit->second.cache_changes.size() == 0) + { + keyed_changes_.erase(vit); + *vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first; + return true; + } + } + logWarning(SUBSCRIBER, "History has reached the maximum number of instances"); + } + return false; +} + +bool DataReaderHistory::remove_change_sub( + CacheChange_t* change) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + + std::lock_guard guard(*mp_mutex); + if (has_keys_) + { + bool found = false; + t_m_Inst_Caches::iterator vit; + if (find_key(change, &vit)) + { + for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) + { + if ((*chit)->sequenceNumber == change->sequenceNumber && (*chit)->writerGUID == change->writerGUID) + { + vit->second.cache_changes.erase(chit); + found = true; + break; + } + } + } + if (!found) + { + logError(SUBSCRIBER, "Change not found on this key, something is wrong"); + } + } + + if (remove_change(change)) + { + m_isHistoryFull = false; + return true; + } + + return false; +} + +bool DataReaderHistory::remove_change_sub( + CacheChange_t* change, + iterator& it) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + + std::lock_guard guard(*mp_mutex); + if (has_keys_) + { + bool found = false; + t_m_Inst_Caches::iterator vit; + if (find_key(change, &vit)) + { + for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit) + { + if ((*chit)->sequenceNumber == change->sequenceNumber && (*chit)->writerGUID == change->writerGUID) + { + assert(it == chit); + it = vit->second.cache_changes.erase(chit); + found = true; + break; + } + } + } + if (!found) + { + logError(SUBSCRIBER, "Change not found on this key, something is wrong"); + } + } + + const_iterator chit = find_change_nts(change); + if (chit == changesEnd()) + { + logInfo(RTPS_WRITER_HISTORY, "Trying to remove a change not in history"); + return false; + } + + m_isHistoryFull = false; + iterator ret_it = remove_change_nts(chit); + + if (!has_keys_) + { + it = ret_it; + } + + return true; +} + +bool DataReaderHistory::set_next_deadline( + const InstanceHandle_t& handle, + const std::chrono::steady_clock::time_point& next_deadline_us) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + std::lock_guard guard(*mp_mutex); + + if (!has_keys_) + { + next_deadline_us_ = next_deadline_us; + return true; + } + + if (keyed_changes_.find(handle) == keyed_changes_.end()) + { + return false; + } + + keyed_changes_[handle].next_deadline_us = next_deadline_us; + return true; +} + +bool DataReaderHistory::get_next_deadline( + InstanceHandle_t& handle, + std::chrono::steady_clock::time_point& next_deadline_us) +{ + if (mp_reader == nullptr || mp_mutex == nullptr) + { + logError(SUBSCRIBER, "You need to create a Reader with this History before using it"); + return false; + } + std::lock_guard guard(*mp_mutex); + + if (!has_keys_) + { + next_deadline_us = next_deadline_us_; + return true; + } + + auto min = std::min_element(keyed_changes_.begin(), + keyed_changes_.end(), + []( + const std::pair& lhs, + const std::pair& rhs) + { + return lhs.second.next_deadline_us < rhs.second.next_deadline_us; + }); + handle = min->first; + next_deadline_us = min->second.next_deadline_us; + return true; +} + +std::pair DataReaderHistory::lookup_instance( + const InstanceHandle_t& handle, + bool exact) +{ + if (!has_keys_) + { + if (handle.isDefined()) + { + // NO_KEY topics can only return the ficticious instance. + // Execution can only get here for two reasons: + // - Looking for a specific instance (exact = true) + // - Looking for the next instance to the ficticious one (exact = false) + // In both cases, no instance should be returned + return { false, {InstanceHandle_t(), nullptr} }; + } + + if (exact) + { + // Looking for HANDLE_NIL, nothing to return + return { false, {InstanceHandle_t(), nullptr} }; + } + + // Looking for the first instance, return the ficticious one containing all changes + InstanceHandle_t tmp; + tmp.value[0] = 1; + return { true, {tmp, &m_changes} }; + } + + t_m_Inst_Caches::iterator it; + + if (exact) + { + it = keyed_changes_.find(handle); + } + else + { + auto comp = [](const InstanceHandle_t& h, const std::pair& it) + { + return h < it.first; + }; + it = std::upper_bound(keyed_changes_.begin(), keyed_changes_.end(), handle, comp); + } + + if (it != keyed_changes_.end()) + { + return { true, {it->first, &(it->second.cache_changes)} }; + } + return { false, {InstanceHandle_t(), nullptr} }; +} + +ReaderHistory::iterator DataReaderHistory::remove_change_nts( + ReaderHistory::const_iterator removal, + bool release) +{ + CacheChange_t* p_sample = nullptr; + + if ( removal != changesEnd() + && (p_sample = *removal)->instanceHandle.isDefined() + && has_keys_) + { + // clean any references to this CacheChange in the key state collection + auto it = keyed_changes_.find(p_sample->instanceHandle); + + // if keyed and in history must be in the map + assert(it != keyed_changes_.end()); + + auto& c = it->second.cache_changes; + c.erase(std::remove(c.begin(), c.end(), p_sample), c.end()); + } + + // call the base class + return ReaderHistory::remove_change_nts(removal, release); +} + +} // namespace detail +} // namsepace dds +} // namespace fastdds +} // namsepace eprosima diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp new file mode 100644 index 00000000000..236ba1f767b --- /dev/null +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp @@ -0,0 +1,249 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DataReaderHistory.hpp + */ + +#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_ +#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_ + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace eprosima { +namespace fastdds { +namespace dds { +namespace detail { + +/** + * Class DataReaderHistory, container of the different CacheChanges of a DataReader + */ +class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory +{ +public: + + using MemoryManagementPolicy_t = eprosima::fastrtps::rtps::MemoryManagementPolicy_t; + using InstanceHandle_t = eprosima::fastrtps::rtps::InstanceHandle_t; + using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t; + + using instance_info = std::pair*>; + + /** + * Constructor. Requires information about the subscriber. + * @param topic_att TopicAttributes. + * @param type TopicDataType. + * @param qos ReaderQoS policy. + * @param payloadMax Maximum payload size per change. + * @param mempolicy Set whether the payloads ccan dynamically resized or not. + */ + DataReaderHistory( + const TypeSupport& type, + const TopicDescription& topic, + const DataReaderQos& qos); + + ~DataReaderHistory() override; + + /** + * Remove a specific change from the history. + * No Thread Safe + * @param removal iterator to the CacheChange_t to remove. + * @param release defaults to true and hints if the CacheChange_t should return to the pool + * @return iterator to the next CacheChange_t or end iterator. + */ + iterator remove_change_nts( + const_iterator removal, + bool release = true) override; + + /** + * Called when a change is received by the Subscriber. Will add the change to the history. + * @pre Change should not be already present in the history. + * @param[in] change The received change + * @param unknown_missing_changes_up_to Number of missing changes before this one + * @return + */ + bool received_change( + CacheChange_t* change, + size_t unknown_missing_changes_up_to) override; + + /** + * @brief Returns information about the first untaken sample. + * @param [out] info SampleInfo structure to store first untaken sample information. + * @return true if sample info was returned. false if there is no sample to take. + */ + bool get_first_untaken_info( + SampleInfo& info); + + /** + * This method is called to remove a change from the SubscriberHistory. + * @param change Pointer to the CacheChange_t. + * @return True if removed. + */ + bool remove_change_sub( + CacheChange_t* change); + + /** + * This method is called to remove a change from the SubscriberHistory. + * @param [in] change Pointer to the CacheChange_t. + * @param [in,out] it Iterator pointing to change on input. Will point to next valid change on output. + * @return True if removed. + */ + bool remove_change_sub( + CacheChange_t* change, + iterator& it); + + /** + * @brief A method to set the next deadline for the given instance + * @param handle The handle to the instance + * @param next_deadline_us The time point when the deadline will occur + * @return True if the deadline was set correctly + */ + bool set_next_deadline( + const InstanceHandle_t& handle, + const std::chrono::steady_clock::time_point& next_deadline_us); + + /** + * @brief A method to get the next instance handle that will miss the deadline and the time when the deadline will occur + * @param handle The handle to the instance + * @param next_deadline_us The time point when the instance will miss the deadline + * @return True if the deadline was retrieved successfully + */ + bool get_next_deadline( + InstanceHandle_t& handle, + std::chrono::steady_clock::time_point& next_deadline_us); + + /** + * @brief Get the list of changes corresponding to an instance handle. + * @param handle The handle to the instance. + * @param exact Indicates if the handle should match exactly (true) or if the first instance greater than the + * input handle should be returned. + * @return A pair where: + * - @c first is a boolean indicating if an instance was found + * - @c second is a pair where: + * - @c first is the handle of the returned instance + * - @c second is a pointer to a std::vector with the list of changes for the + * returned instance + * + * @remarks When used on a NO_KEY topic, an instance will only be returned when called with + * `handle = HANDLE_NIL` and `exact = false`. + */ + std::pair lookup_instance( + const InstanceHandle_t& handle, + bool exact); + +private: + + using t_m_Inst_Caches = std::map; + + //!Map where keys are instance handles and values vectors of cache changes + t_m_Inst_Caches keyed_changes_; + //!Time point when the next deadline will occur (only used for topics with no key) + std::chrono::steady_clock::time_point next_deadline_us_; + //!HistoryQosPolicy values. + HistoryQosPolicy history_qos_; + //!ResourceLimitsQosPolicy values. + ResourceLimitsQosPolicy resource_limited_qos_; + //!Topic name + fastrtps::string_255 topic_name_; + //!Type name + fastrtps::string_255 type_name_; + //!Whether the type has keys + bool has_keys_; + //!TopicDataType + fastdds::dds::TopicDataType* type_; + + //!Type object to deserialize Key + void* get_key_object_; + + /// Function processing a received change + std::function receive_fn_; + + /** + * @brief Method that finds a key in m_keyedChanges or tries to add it if not found + * @param a_change The change to get the key from + * @param map_it A map iterator to the given key + * @return True if it was found or could be added to the map + */ + bool find_key( + CacheChange_t* a_change, + t_m_Inst_Caches::iterator* map_it); + + /** + * @brief Method that finds a key in m_keyedChanges or tries to add it if not found + * @param a_change The change to get the key from + * @param map_it A map iterator to the given key + * @return True if it was found or could be added to the map + */ + bool find_key_for_change( + CacheChange_t* a_change, + t_m_Inst_Caches::iterator& map_it); + + /** + * @name Variants of incoming change processing. + * Will be called with the history mutex taken. + * @param[in] change The received change + * @param unknown_missing_changes_up_to Number of missing changes before this one + * @return + */ + ///@{ + bool received_change_keep_all_no_key( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_last_no_key( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_all_with_key( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_last_with_key( + CacheChange_t* change, + size_t unknown_missing_changes_up_to); + ///@} + + bool add_received_change( + CacheChange_t* a_change); + + bool add_received_change_with_key( + CacheChange_t* a_change, + std::vector& instance_changes); +}; + +} // namespace detail +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_ diff --git a/test/unittest/dds/status/CMakeLists.txt b/test/unittest/dds/status/CMakeLists.txt index c9c2d024def..20ad2c1ee09 100644 --- a/test/unittest/dds/status/CMakeLists.txt +++ b/test/unittest/dds/status/CMakeLists.txt @@ -36,6 +36,7 @@ set(LISTENERTESTS_SOURCE ListenerTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/SubscriberImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/SubscriberQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt index b6884f41f39..ea8dc85c4cd 100644 --- a/test/unittest/statistics/dds/CMakeLists.txt +++ b/test/unittest/statistics/dds/CMakeLists.txt @@ -129,6 +129,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS) ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/Subscriber.cpp