From 27051183f13ac0a0fb43de3cbe1858e0394f8e8e Mon Sep 17 00:00:00 2001
From: Miguel Company <miguelcompany@eprosima.com>
Date: Tue, 21 Dec 2021 11:54:41 +0100
Subject: [PATCH] DataReaderHistory (#2363)

* Refactor DataReaderImpl to use a new DataReaderHistory class (#2177)

* Refs 12400. Duplicating SubscriberHistory into DataReaderHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Using DataReaderHistory on DataReaderImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Avoid using TopicAttributes.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Additional cleanup.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Using DDS SampleInfo.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12404. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Fixed DataReaderHistory (#2194)

* DataReader test for sample_info fields (#2193)

* Refs 12400. Initial test infrastructure.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Additional tests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Tests are run twice with a take in the middle.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12469. Fixed warnings.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12469. Additional comments.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Adding implementation for instance_state and view_state (#2298)

* Refs 12400. Added DataReaderCacheChange.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Added DataReaderInstance.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. DataReaderHistory using new types.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Removing unnecessary method from SubscriberHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. ReadTakeCommand receives full instance information.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. ReadTakeCommand checks for instance states.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. ReadTakeCommand fills sample info from instance data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Added insert method to ResourceLimitedVector.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. DataReaderInstance uses ResourceLimitedVector.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. get_first_untaken_info takes sample info from instance data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Discard received change when older than oldest.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Fixing KEEP_ALL with keys.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Refactor to always use instances.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Basic structure for update instance state.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Adding alive_writers and current_owner to DataReaderInstance.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Implementing writer_alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Implementing writer_dispose.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Implementing writer_unregister.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Setting NOT_NEW on returned instances.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Correct return code when returning samples with no data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Set view_state to NEW when changing instance_state to ALIVE.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Moving generation counts into CacheChange_t.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Assigning generation counts after processing instance state.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Update instance_state when writer becomes not alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Clear alive_writers when changing generation.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Add writer_unmatched to ReaderHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. NOT_ALIVE_UNREGISTERED should not return valid data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Set autodispose_unregistered_instances to false on test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Remove instance when it becomes empty and is not alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Refactor into writer_not_alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Keeping samples from unmatched writers.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Avoid keeping non-notified samples.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Fixing compilation warnings on windows.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12758. Fixing assertion on WriterProxy logic. If we only notify fragmented DATA reception on completion we should only notify removal of fully assembled samples.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12758 Fixing DataReaderHistory test that checks DataWriter disposal behaviour.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12758. Use move semantics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Use ResourceLimitedVector for writers.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Apply pre-allocation policies.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. PubSubReader. Account for writer_guid on last_seq checks.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Added can_change_be_added_nts.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Removed unused method.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Always use completed changes for key computation.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Fixed ResourceLimitedVector::insert.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Avoid dynamic allocation inside remove_changes_with_pred.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Optimization on DataReaderHistory::remove_change_nts.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Method writer_unmatched documented and improved.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Do not complete changes for non-keyed topics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Do not remove incomplete changes for keyed topics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Different removal policy on ReaderHistory and DataReaderHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Fix unused parameter warning.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Removed unused header.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Doxydoc improvements.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12400. Adding checks for insert on ResourceLimitedVectorTests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Fixed rvalue version of ResourceLimitedVector::insert.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Added feature to versions.md.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Fixed inclusion of DataReaderHistory.cpp on CMake files.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Add missing feature to versions.md.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Miguel Barro <miguelbarro@eprosima.com>
---
 include/fastdds/dds/subscriber/SampleInfo.hpp |   2 +-
 include/fastdds/rtps/common/CacheChange.h     |   4 +
 include/fastdds/rtps/history/ReaderHistory.h  |  28 +
 .../fastrtps/subscriber/SubscriberHistory.h   |  21 -
 .../collections/ResourceLimitedVector.hpp     |  42 ++
 src/cpp/CMakeLists.txt                        |   3 +-
 src/cpp/fastdds/subscriber/DataReaderImpl.cpp |  59 +-
 src/cpp/fastdds/subscriber/DataReaderImpl.hpp |   5 +-
 .../DataReaderImpl/ReadTakeCommand.hpp        | 109 +--
 .../history/DataReaderCacheChange.hpp         |  39 +
 .../subscriber/history/DataReaderHistory.cpp  | 707 ++++++++++++++++++
 .../subscriber/history/DataReaderHistory.hpp  | 313 ++++++++
 .../subscriber/history/DataReaderInstance.hpp | 216 ++++++
 .../subscriber/SubscriberHistory.cpp          |  52 --
 src/cpp/rtps/history/ReaderHistory.cpp        |  37 +-
 src/cpp/rtps/reader/StatefulReader.cpp        |  52 +-
 src/cpp/rtps/reader/StatelessReader.cpp       |   2 +-
 src/cpp/utils/SystemInfo.cpp                  |   3 +-
 test/blackbox/api/dds-pim/PubSubReader.hpp    |   8 +-
 .../fastdds/rtps/history/ReaderHistory.h      |  12 +
 test/unittest/dds/publisher/CMakeLists.txt    |   1 +
 test/unittest/dds/status/CMakeLists.txt       |   1 +
 .../dds/subscriber/DataReaderTests.cpp        | 313 +++++++-
 test/unittest/statistics/dds/CMakeLists.txt   |   1 +
 .../utils/ResourceLimitedVectorTests.cpp      |  50 +-
 versions.md                                   |   2 +
 26 files changed, 1835 insertions(+), 247 deletions(-)
 create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp
 create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
 create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp
 create mode 100644 src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp

diff --git a/include/fastdds/dds/subscriber/SampleInfo.hpp b/include/fastdds/dds/subscriber/SampleInfo.hpp
index 7af134ba44e..8f9e5ba8da5 100644
--- a/include/fastdds/dds/subscriber/SampleInfo.hpp
+++ b/include/fastdds/dds/subscriber/SampleInfo.hpp
@@ -89,7 +89,7 @@ struct SampleInfo
 
     //! the generation difference between the time the sample was received, and the time the most recent sample was received.
     //! The most recent sample used for the calculation may or may not be in the returned collection
-    int32_t absoulte_generation_rank;
+    int32_t absolute_generation_rank;
 
     //! time provided by the DataWriter when the sample was written
     fastrtps::rtps::Time_t source_timestamp;
diff --git a/include/fastdds/rtps/common/CacheChange.h b/include/fastdds/rtps/common/CacheChange.h
index 556149ad8a9..1f3c260ca8a 100644
--- a/include/fastdds/rtps/common/CacheChange.h
+++ b/include/fastdds/rtps/common/CacheChange.h
@@ -57,6 +57,10 @@ struct CacheChangeReaderInfo_t
 {
     //!Reception TimeStamp (only used in Readers)
     Time_t receptionTimestamp;
+    //! Disposed generation of the instance when this entry was added to it
+    int32_t disposed_generation_count;
+    //! No-writers generation of the instance when this entry was added to it
+    int32_t no_writers_generation_count;
 };
 
 /**
diff --git a/include/fastdds/rtps/history/ReaderHistory.h b/include/fastdds/rtps/history/ReaderHistory.h
index 2d83ca02bbd..eb2a9fa6c39 100644
--- a/include/fastdds/rtps/history/ReaderHistory.h
+++ b/include/fastdds/rtps/history/ReaderHistory.h
@@ -152,6 +152,19 @@ class ReaderHistory : public History
             CacheChange_t** min_change,
             const GUID_t& writerGuid);
 
+    /**
+     * Called when a writer is unmatched from the reader holding this history.
+     *
+     * This method will remove all the changes on the history that came from the writer being unmatched and which have
+     * not yet been notified to the user.
+     *
+     * @param writer_guid        GUID of the writer being unmatched.
+     * @param last_notified_seq  Last sequence number from the specified writer that was notified to the user.
+     */
+    RTPS_DllAPI virtual void writer_unmatched(
+            const GUID_t& writer_guid,
+            const SequenceNumber_t& last_notified_seq);
+
 protected:
 
     RTPS_DllAPI bool do_reserve_cache(
@@ -161,6 +174,21 @@ class ReaderHistory : public History
     RTPS_DllAPI void do_release_cache(
             CacheChange_t* ch) override;
 
+    template<typename Pred>
+    inline void remove_changes_with_pred(
+            Pred pred)
+    {
+        assert(nullptr != mp_reader);
+        assert(nullptr != mp_mutex);
+
+        std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
+        std::vector<CacheChange_t*>::iterator new_end = std::remove_if(m_changes.begin(), m_changes.end(), pred);
+        while (new_end != m_changes.end())
+        {
+            new_end = remove_change_nts(new_end);
+        }
+    }
+
     //!Pointer to the reader
     RTPSReader* mp_reader;
 
diff --git a/include/fastrtps/subscriber/SubscriberHistory.h b/include/fastrtps/subscriber/SubscriberHistory.h
index d0d1f1fdf2b..be94ed6a291 100644
--- a/include/fastrtps/subscriber/SubscriberHistory.h
+++ b/include/fastrtps/subscriber/SubscriberHistory.h
@@ -44,8 +44,6 @@ class SubscriberHistory : public rtps::ReaderHistory
 {
 public:
 
-    using instance_info = std::pair<rtps::InstanceHandle_t, std::vector<rtps::CacheChange_t*>*>;
-
     /**
      * Constructor. Requires information about the subscriber.
      * @param topic_att TopicAttributes.
@@ -179,25 +177,6 @@ class SubscriberHistory : public rtps::ReaderHistory
             rtps::InstanceHandle_t& handle,
             std::chrono::steady_clock::time_point& next_deadline_us);
 
-    /**
-     * @brief Get the list of changes corresponding to an instance handle.
-     * @param handle The handle to the instance.
-     * @param exact  Indicates if the handle should match exactly (true) or if the first instance greater than the
-     *               input handle should be returned.
-     * @return A pair where:
-     *         - @c first is a boolean indicating if an instance was found
-     *         - @c second is a pair where:
-     *           - @c first is the handle of the returned instance
-     *           - @c second is a pointer to a std::vector<rtps::CacheChange_t*> with the list of changes for the
-     *             returned instance
-     *
-     * @remarks When used on a NO_KEY topic, an instance will only be returned when called with
-     *          `handle = HANDLE_NIL` and `exact = false`.
-     */
-    std::pair<bool, instance_info> lookup_instance(
-            const rtps::InstanceHandle_t& handle,
-            bool exact);
-
 private:
 
     using t_m_Inst_Caches = std::map<rtps::InstanceHandle_t, KeyedChanges>;
diff --git a/include/fastrtps/utils/collections/ResourceLimitedVector.hpp b/include/fastrtps/utils/collections/ResourceLimitedVector.hpp
index 91aa80a3c12..cafe1291053 100644
--- a/include/fastrtps/utils/collections/ResourceLimitedVector.hpp
+++ b/include/fastrtps/utils/collections/ResourceLimitedVector.hpp
@@ -119,6 +119,48 @@ class ResourceLimitedVector
         return *this;
     }
 
+    /**
+     * Insert value before pos.
+     *
+     * @param pos   iterator before which the content will be inserted. pos may be the end() iterator.
+     * @param value element value to insert.
+     *
+     * @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
+     */
+    iterator insert(
+            const_iterator pos,
+            const value_type& value)
+    {
+        auto dist = std::distance(collection_.cbegin(), pos);
+        if (!ensure_capacity())
+        {
+            return end();
+        }
+
+        return collection_.insert(collection_.cbegin() + dist, value);
+    }
+
+    /**
+     * Insert value before pos.
+     *
+     * @param pos   iterator before which the content will be inserted. pos may be the end() iterator.
+     * @param value element value to insert.
+     *
+     * @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
+     */
+    iterator insert(
+            const_iterator pos,
+            value_type&& value)
+    {
+        auto dist = std::distance(collection_.cbegin(), pos);
+        if (!ensure_capacity())
+        {
+            return end();
+        }
+
+        return collection_.insert(collection_.cbegin() + dist, std::move(value));
+    }
+
     /**
      * Add element at the end.
      *
diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt
index e8e38f74927..53b987cfeb8 100644
--- a/src/cpp/CMakeLists.txt
+++ b/src/cpp/CMakeLists.txt
@@ -85,9 +85,7 @@ set(${PROJECT_NAME}_source_files
     fastrtps_deprecated/subscriber/Subscriber.cpp
     fastrtps_deprecated/subscriber/SubscriberImpl.cpp
     fastrtps_deprecated/subscriber/SubscriberHistory.cpp
-    fastdds/subscriber/DataReader.cpp
     fastdds/publisher/DataWriter.cpp
-    fastdds/subscriber/DataReaderImpl.cpp
     fastdds/publisher/DataWriterImpl.cpp
     fastdds/topic/ContentFilteredTopic.cpp
     fastdds/topic/Topic.cpp
@@ -105,6 +103,7 @@ set(${PROJECT_NAME}_source_files
     fastdds/subscriber/Subscriber.cpp
     fastdds/subscriber/DataReader.cpp
     fastdds/subscriber/DataReaderImpl.cpp
+    fastdds/subscriber/history/DataReaderHistory.cpp
     fastdds/domain/DomainParticipantFactory.cpp
     fastdds/domain/DomainParticipantImpl.cpp
     fastdds/domain/DomainParticipant.cpp
diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp
index 61c7eaf8dc6..c013ac36b94 100644
--- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp
+++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp
@@ -62,40 +62,6 @@ namespace eprosima {
 namespace fastdds {
 namespace dds {
 
-static void sample_info_to_dds (
-        const SampleInfo_t& rtps_info,
-        SampleInfo* dds_info)
-{
-    dds_info->sample_state = NOT_READ_SAMPLE_STATE;
-    dds_info->view_state = NOT_NEW_VIEW_STATE;
-    dds_info->disposed_generation_count = 0;
-    dds_info->no_writers_generation_count = 1;
-    dds_info->sample_rank = 0;
-    dds_info->generation_rank = 0;
-    dds_info->absoulte_generation_rank = 0;
-    dds_info->source_timestamp = rtps_info.sourceTimestamp;
-    dds_info->reception_timestamp = rtps_info.receptionTimestamp;
-    dds_info->instance_handle = rtps_info.iHandle;
-    dds_info->publication_handle = fastrtps::rtps::InstanceHandle_t(rtps_info.sample_identity.writer_guid());
-    dds_info->sample_identity = rtps_info.sample_identity;
-    dds_info->related_sample_identity = rtps_info.related_sample_identity;
-    dds_info->valid_data = rtps_info.sampleKind == eprosima::fastrtps::rtps::ALIVE ? true : false;
-
-    switch (rtps_info.sampleKind)
-    {
-        case eprosima::fastrtps::rtps::ALIVE:
-            dds_info->instance_state = ALIVE_INSTANCE_STATE;
-            break;
-        case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
-            dds_info->instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE;
-            break;
-        default:
-            //TODO [ILG] change this if the other kinds ever get implemented
-            dds_info->instance_state = ALIVE_INSTANCE_STATE;
-            break;
-    }
-}
-
 static bool collections_have_same_properties(
         const LoanableCollection& data_values,
         const SampleInfoSeq& sample_infos)
@@ -131,11 +97,7 @@ DataReaderImpl::DataReaderImpl(
     , topic_(topic)
     , qos_(&qos == &DATAREADER_QOS_DEFAULT ? subscriber_->get_default_datareader_qos() : qos)
 #pragma warning (disable : 4355 )
-    , history_(topic_attributes(),
-            type_.get(),
-            qos_.get_readerqos(subscriber_->get_qos()),
-            type_->m_typeSize + 3,    /* Possible alignment */
-            qos_.endpoint().history_memory_policy)
+    , history_(type, *topic, qos_)
     , listener_(listener)
     , reader_listener_(this)
     , deadline_duration_us_(qos_.deadline().period.to_ns() * 1e-3)
@@ -714,10 +676,8 @@ ReturnCode_t DataReaderImpl::get_first_untaken_info(
         return ReturnCode_t::RETCODE_NOT_ENABLED;
     }
 
-    SampleInfo_t rtps_info;
-    if (history_.get_first_untaken_info(&rtps_info))
+    if (history_.get_first_untaken_info(*info))
     {
-        sample_info_to_dds(rtps_info, info);
         return ReturnCode_t::RETCODE_OK;
     }
     return ReturnCode_t::RETCODE_NO_DATA;
@@ -892,10 +852,10 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos(
 bool DataReaderImpl::on_new_cache_change_added(
         const CacheChange_t* const change)
 {
+    std::lock_guard<RecursiveTimedMutex> guard(reader_->getMutex());
+
     if (qos_.deadline().period != c_TimeInfinite)
     {
-        std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());
-
         if (!history_.set_next_deadline(
                     change->instanceHandle,
                     steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
@@ -913,6 +873,7 @@ bool DataReaderImpl::on_new_cache_change_added(
     }
 
     CacheChange_t* new_change = const_cast<CacheChange_t*>(change);
+    history_.update_instance_nts(new_change);
 
     if (qos_.lifespan().duration == c_TimeInfinite)
     {
@@ -968,6 +929,11 @@ void DataReaderImpl::update_subscription_matched_status(
         subscription_matched_status_.last_publication_handle = status.last_publication_handle;
     }
 
+    if (count_change < 0)
+    {
+        history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
+    }
+
     StatusMask notify_status = StatusMask::subscription_matched();
     DataReaderListener* listener = get_listener_for(notify_status);
     if (listener != nullptr)
@@ -1235,6 +1201,11 @@ RequestedIncompatibleQosStatus& DataReaderImpl::update_requested_incompatible_qo
 LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
         const fastrtps::LivelinessChangedStatus& status)
 {
+    if (0 < status.not_alive_count_change)
+    {
+        history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
+    }
+
     liveliness_changed_status_.alive_count = status.alive_count;
     liveliness_changed_status_.not_alive_count = status.not_alive_count;
     liveliness_changed_status_.alive_count_change += status.alive_count_change;
diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp
index d083681e19d..44aeafc7152 100644
--- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp
+++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp
@@ -36,7 +36,6 @@
 #include <fastdds/rtps/reader/ReaderListener.h>
 
 #include <fastrtps/attributes/TopicAttributes.h>
-#include <fastrtps/subscriber/SubscriberHistory.h>
 #include <fastrtps/qos/LivelinessChangedStatus.h>
 #include <fastrtps/types/TypesBase.h>
 
@@ -46,6 +45,8 @@
 #include <fastdds/subscriber/SubscriberImpl.hpp>
 #include <rtps/history/ITopicPayloadPool.h>
 
+#include <fastdds/subscriber/history/DataReaderHistory.hpp>
+
 using eprosima::fastrtps::types::ReturnCode_t;
 
 namespace eprosima {
@@ -331,7 +332,7 @@ class DataReaderImpl
     DataReaderQos qos_;
 
     //!History
-    fastrtps::SubscriberHistory history_;
+    detail::DataReaderHistory history_;
 
     //!Listener
     DataReaderListener* listener_ = nullptr;
diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
index dbd4a79b6ea..5ba529ebed3 100644
--- a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
+++ b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
@@ -27,7 +27,6 @@
 #include <fastdds/dds/topic/TypeSupport.hpp>
 #include <fastdds/dds/subscriber/SampleInfo.hpp>
 
-#include <fastrtps/subscriber/SubscriberHistory.h>
 #include <fastrtps/types/TypesBase.h>
 
 #include <fastdds/subscriber/DataReaderImpl.hpp>
@@ -35,6 +34,7 @@
 #include <fastdds/subscriber/DataReaderImpl/StateFilter.hpp>
 #include <fastdds/subscriber/DataReaderImpl/SampleInfoPool.hpp>
 #include <fastdds/subscriber/DataReaderImpl/SampleLoanManager.hpp>
+#include <fastdds/subscriber/history/DataReaderHistory.hpp>
 
 #include <fastdds/rtps/common/CacheChange.h>
 #include <fastdds/rtps/reader/RTPSReader.h>
@@ -51,7 +51,7 @@ namespace detail {
 struct ReadTakeCommand
 {
     using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t;
-    using history_type = eprosima::fastrtps::SubscriberHistory;
+    using history_type = eprosima::fastdds::dds::detail::DataReaderHistory;
     using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t;
     using RTPSReader = eprosima::fastrtps::rtps::RTPSReader;
     using WriterProxy = eprosima::fastrtps::rtps::WriterProxy;
@@ -64,7 +64,7 @@ struct ReadTakeCommand
             SampleInfoSeq& sample_infos,
             int32_t max_samples,
             const StateFilter& states,
-            history_type::instance_info instance,
+            const history_type::instance_info& instance,
             bool single_instance = false)
         : type_(reader.type_)
         , loan_manager_(reader.loan_manager_)
@@ -108,8 +108,8 @@ struct ReadTakeCommand
         // Traverse changes on current instance
         bool ret_val = false;
         LoanableCollection::size_type first_slot = current_slot_;
-        auto it = instance_.second->begin();
-        while (!finished_ && it != instance_.second->end())
+        auto it = instance_.second->cache_changes.begin();
+        while (!finished_ && it != instance_.second->cache_changes.end())
         {
             CacheChange_t* change = *it;
             SampleStateKind check;
@@ -142,10 +142,9 @@ struct ReadTakeCommand
                 // in the future also
                 if (!is_future_change)
                 {
-
                     // Add sample and info to collections
                     ReturnCode_t previous_return_value = return_value_;
-                    bool added = add_sample(change, remove_change);
+                    bool added = add_sample(*it, remove_change);
                     reader_->end_sample_access_nts(change, wp, added);
 
                     // Check if the payload is dirty
@@ -181,6 +180,7 @@ struct ReadTakeCommand
 
         if (current_slot_ > first_slot)
         {
+            instance_.second->view_state = ViewStateKind::NOT_NEW_VIEW_STATE;
             ret_val = true;
 
             // complete sample infos
@@ -208,6 +208,41 @@ struct ReadTakeCommand
         return return_value_;
     }
 
+    static void generate_info(
+            SampleInfo& info,
+            const DataReaderInstance& instance,
+            const DataReaderCacheChange& item)
+    {
+        info.sample_state = item->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE;
+        info.instance_state = instance.instance_state;
+        info.view_state = instance.view_state;
+        info.disposed_generation_count = item->reader_info.disposed_generation_count;
+        info.no_writers_generation_count = item->reader_info.no_writers_generation_count;
+        info.sample_rank = 0;
+        info.generation_rank = 0;
+        info.absolute_generation_rank = 0;
+        info.source_timestamp = item->sourceTimestamp;
+        info.reception_timestamp = item->reader_info.receptionTimestamp;
+        info.instance_handle = item->instanceHandle;
+        info.publication_handle = InstanceHandle_t(item->writerGUID);
+        info.sample_identity.writer_guid(item->writerGUID);
+        info.sample_identity.sequence_number(item->sequenceNumber);
+        info.related_sample_identity = item->write_params.sample_identity();
+        info.valid_data = true;
+
+        switch (item->kind)
+        {
+            case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
+            case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED:
+            case eprosima::fastrtps::rtps::NOT_ALIVE_UNREGISTERED:
+                info.valid_data = false;
+                break;
+            case eprosima::fastrtps::rtps::ALIVE:
+            default:
+                break;
+        }
+    }
+
 private:
 
     const TypeSupport& type_;
@@ -243,14 +278,15 @@ struct ReadTakeCommand
 
     bool is_current_instance_valid()
     {
-        // We are not implementing instance_state or view_state yet, so all instances will be considered to have
-        // a valid state. In the future this should check instance_state against states_.instance_states and
-        // view_state against states_.view_states
-        return true;
+        // Check instance_state against states_.instance_states and view_state against states_.view_states
+        auto instance_state = instance_.second->instance_state;
+        auto view_state = instance_.second->view_state;
+        return (0 != (states_.instance_states & instance_state)) && (0 != (states_.view_states & view_state));
     }
 
     bool next_instance()
     {
+        history_.check_and_remove_instance(instance_);
         if (single_instance_)
         {
             finished_ = true;
@@ -270,7 +306,7 @@ struct ReadTakeCommand
     }
 
     bool add_sample(
-            CacheChange_t* change,
+            const DataReaderCacheChange& item,
             bool& deserialization_error)
     {
         bool ret_val = false;
@@ -284,10 +320,10 @@ struct ReadTakeCommand
             sample_infos_.length(new_len);
 
             // Add information
-            generate_info(change);
+            generate_info(item);
             if (sample_infos_[current_slot_].valid_data)
             {
-                if (!deserialize_sample(change))
+                if (!deserialize_sample(item))
                 {
                     // Decrement length of collections
                     data_values_.length(current_slot_);
@@ -295,11 +331,10 @@ struct ReadTakeCommand
                     deserialization_error = true;
                     return false;
                 }
-
-                // Mark that some data is available
-                return_value_ = ReturnCode_t::RETCODE_OK;
             }
 
+            // Mark that some data is available
+            return_value_ = ReturnCode_t::RETCODE_OK;
             ++current_slot_;
             --remaining_samples_;
             ret_val = true;
@@ -330,48 +365,18 @@ struct ReadTakeCommand
     }
 
     void generate_info(
-            CacheChange_t* change)
+            const DataReaderCacheChange& item)
     {
         // Loan when necessary
         if (!sample_infos_.has_ownership())
         {
-            SampleInfo* item = info_pool_.get_item();
-            assert(item != nullptr);
-            const_cast<void**>(sample_infos_.buffer())[current_slot_] = item;
+            SampleInfo* pool_item = info_pool_.get_item();
+            assert(pool_item != nullptr);
+            const_cast<void**>(sample_infos_.buffer())[current_slot_] = pool_item;
         }
 
         SampleInfo& info = sample_infos_[current_slot_];
-        info.sample_state = change->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE;
-        info.view_state = NOT_NEW_VIEW_STATE;
-        info.disposed_generation_count = 0;
-        info.no_writers_generation_count = 1;
-        info.sample_rank = 0;
-        info.generation_rank = 0;
-        info.absoulte_generation_rank = 0;
-        info.source_timestamp = change->sourceTimestamp;
-        info.reception_timestamp = change->reader_info.receptionTimestamp;
-        info.instance_handle = handle_;
-        info.publication_handle = InstanceHandle_t(change->writerGUID);
-        info.sample_identity.writer_guid(change->writerGUID);
-        info.sample_identity.sequence_number(change->sequenceNumber);
-        info.related_sample_identity = change->write_params.sample_identity();
-        info.valid_data = true;
-
-        switch (change->kind)
-        {
-            case eprosima::fastrtps::rtps::ALIVE:
-                info.instance_state = ALIVE_INSTANCE_STATE;
-                break;
-            case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
-            case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED:
-                info.instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE;
-                info.valid_data = false;
-                break;
-            default:
-                //TODO [ILG] change this if the other kinds ever get implemented
-                info.instance_state = ALIVE_INSTANCE_STATE;
-                break;
-        }
+        generate_info(info, *instance_.second, item);
     }
 
     bool check_datasharing_validity(
diff --git a/src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp b/src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp
new file mode 100644
index 00000000000..5b13109f721
--- /dev/null
+++ b/src/cpp/fastdds/subscriber/history/DataReaderCacheChange.hpp
@@ -0,0 +1,39 @@
+// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * @file DataReaderCacheChange.hpp
+ */
+
+#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_
+#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_
+
+#include <cstdint>
+
+#include <fastdds/rtps/common/CacheChange.h>
+
+namespace eprosima {
+namespace fastdds {
+namespace dds {
+namespace detail {
+
+/// A DataReader cache entry
+using DataReaderCacheChange = fastrtps::rtps::CacheChange_t*;
+
+} /* namespace detail */
+} /* namespace dds */
+} /* namespace fastdds */
+} /* namespace eprosima */
+
+#endif  // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_
diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
new file mode 100644
index 00000000000..9ca5826bf1c
--- /dev/null
+++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
@@ -0,0 +1,707 @@
+// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * @file DataReaderHistory.cpp
+ */
+
+#include <limits>
+#include <mutex>
+
+#include "DataReaderHistory.hpp"
+
+#include <fastdds/dds/topic/TopicDescription.hpp>
+#include <fastdds/dds/topic/TypeSupport.hpp>
+
+#include <fastdds/dds/log/Log.hpp>
+#include <fastdds/rtps/reader/RTPSReader.h>
+
+#include <fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp>
+#include <rtps/reader/WriterProxy.h>
+#include <utils/collections/sorted_vector_insert.hpp>
+
+namespace eprosima {
+namespace fastdds {
+namespace dds {
+namespace detail {
+
+using namespace eprosima::fastrtps::rtps;
+
+using eprosima::fastrtps::RecursiveTimedMutex;
+
+static HistoryAttributes to_history_attributes(
+        const TypeSupport& type,
+        const DataReaderQos& qos)
+{
+    auto initial_samples = qos.resource_limits().allocated_samples;
+    auto max_samples = qos.resource_limits().max_samples;
+
+    if (qos.history().kind != KEEP_ALL_HISTORY_QOS)
+    {
+        max_samples = qos.history().depth;
+        if (type->m_isGetKeyDefined)
+        {
+            max_samples *= qos.resource_limits().max_instances;
+        }
+
+        initial_samples = std::min(initial_samples, max_samples);
+    }
+
+    auto mempolicy = qos.endpoint().history_memory_policy;
+    auto payloadMaxSize = type->m_typeSize + 3; // possible alignment
+
+    return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples);
+}
+
+DataReaderHistory::DataReaderHistory(
+        const TypeSupport& type,
+        const TopicDescription& topic,
+        const DataReaderQos& qos)
+    : ReaderHistory(to_history_attributes(type, qos))
+    , key_writers_allocation_(qos.reader_resource_limits().matched_publisher_allocation)
+    , history_qos_(qos.history())
+    , resource_limited_qos_(qos.resource_limits())
+    , topic_name_(topic.get_name())
+    , type_name_(topic.get_type_name())
+    , has_keys_(type->m_isGetKeyDefined)
+    , type_(type.get())
+    , get_key_object_(nullptr)
+{
+    if (resource_limited_qos_.max_samples == 0)
+    {
+        resource_limited_qos_.max_samples = std::numeric_limits<int32_t>::max();
+    }
+
+    if (resource_limited_qos_.max_instances == 0)
+    {
+        resource_limited_qos_.max_instances = std::numeric_limits<int32_t>::max();
+    }
+
+    if (resource_limited_qos_.max_samples_per_instance == 0)
+    {
+        resource_limited_qos_.max_samples_per_instance = std::numeric_limits<int32_t>::max();
+    }
+
+    if (type_->m_isGetKeyDefined)
+    {
+        get_key_object_ = type_->createData();
+
+        if (resource_limited_qos_.max_samples_per_instance < std::numeric_limits<int32_t>::max())
+        {
+            key_changes_allocation_.maximum = resource_limited_qos_.max_samples_per_instance;
+        }
+    }
+    else
+    {
+        resource_limited_qos_.max_instances = 1;
+        resource_limited_qos_.max_samples_per_instance = resource_limited_qos_.max_samples;
+        key_changes_allocation_.initial = resource_limited_qos_.allocated_samples;
+        key_changes_allocation_.maximum = resource_limited_qos_.max_samples;
+
+        keyed_changes_.emplace(c_InstanceHandle_Unknown,
+                DataReaderInstance{ key_changes_allocation_, key_writers_allocation_ });
+    }
+
+    using std::placeholders::_1;
+    using std::placeholders::_2;
+
+    receive_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ?
+            std::bind(&DataReaderHistory::received_change_keep_all, this, _1, _2) :
+            std::bind(&DataReaderHistory::received_change_keep_last, this, _1, _2);
+
+    complete_fn_ = qos.history().kind == KEEP_ALL_HISTORY_QOS ?
+            std::bind(&DataReaderHistory::completed_change_keep_all, this, _1, _2) :
+            std::bind(&DataReaderHistory::completed_change_keep_last, this, _1, _2);
+
+    if (!has_keys_)
+    {
+        compute_key_for_change_fn_ = [](CacheChange_t* change)
+                {
+                    change->instanceHandle = c_InstanceHandle_Unknown;
+                    return true;
+                };
+    }
+    else
+    {
+        compute_key_for_change_fn_ =
+                [this](CacheChange_t* a_change)
+                {
+                    if (a_change->instanceHandle.isDefined())
+                    {
+                        return true;
+                    }
+
+                    if (!a_change->is_fully_assembled())
+                    {
+                        return false;
+                    }
+
+                    if (type_ != nullptr)
+                    {
+                        logInfo(SUBSCRIBER, "Getting Key of change with no Key transmitted");
+                        type_->deserialize(&a_change->serializedPayload, get_key_object_);
+                        bool is_key_protected = false;
+#if HAVE_SECURITY
+                        is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected;
+#endif // if HAVE_SECURITY
+                        return type_->getKey(get_key_object_, &a_change->instanceHandle, is_key_protected);
+                    }
+
+                    logWarning(SUBSCRIBER, "NO KEY in topic: " << topic_name_
+                                                               << " and no method to obtain it"; );
+                    return false;
+                };
+    }
+}
+
+DataReaderHistory::~DataReaderHistory()
+{
+    if (type_->m_isGetKeyDefined)
+    {
+        type_->deleteData(get_key_object_);
+    }
+}
+
+bool DataReaderHistory::can_change_be_added_nts(
+        const GUID_t& writer_guid,
+        uint32_t total_payload_size,
+        size_t unknown_missing_changes_up_to,
+        bool& will_never_be_accepted) const
+{
+    if (!ReaderHistory::can_change_be_added_nts(writer_guid, total_payload_size, unknown_missing_changes_up_to,
+            will_never_be_accepted))
+    {
+        return false;
+    }
+
+    will_never_be_accepted = false;
+    return (KEEP_ALL_HISTORY_QOS != history_qos_.kind) ||
+           (m_changes.size() + unknown_missing_changes_up_to < static_cast<size_t>(resource_limited_qos_.max_samples));
+}
+
+bool DataReaderHistory::received_change(
+        CacheChange_t* a_change,
+        size_t unknown_missing_changes_up_to)
+{
+    if (mp_reader == nullptr || mp_mutex == nullptr)
+    {
+        logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
+        return false;
+    }
+
+    std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
+    return receive_fn_(a_change, unknown_missing_changes_up_to);
+}
+
+bool DataReaderHistory::received_change_keep_all(
+        CacheChange_t* a_change,
+        size_t unknown_missing_changes_up_to)
+{
+    if (!compute_key_for_change_fn_(a_change))
+    {
+        // Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too.
+        return add_to_reader_history_if_not_full(a_change);
+    }
+
+    InstanceCollection::iterator vit;
+    if (find_key(a_change->instanceHandle, vit))
+    {
+        DataReaderInstance::ChangeCollection& instance_changes = vit->second.cache_changes;
+        size_t total_size = instance_changes.size() + unknown_missing_changes_up_to;
+        if (total_size < static_cast<size_t>(resource_limited_qos_.max_samples_per_instance))
+        {
+            return add_received_change_with_key(a_change, vit->second);
+        }
+
+        logInfo(SUBSCRIBER, "Change not added due to maximum number of samples per instance");
+    }
+
+    return false;
+}
+
+bool DataReaderHistory::received_change_keep_last(
+        CacheChange_t* a_change,
+        size_t /* unknown_missing_changes_up_to */)
+{
+    if (!compute_key_for_change_fn_(a_change))
+    {
+        // Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too.
+        return add_to_reader_history_if_not_full(a_change);
+    }
+
+    InstanceCollection::iterator vit;
+    if (find_key(a_change->instanceHandle, vit))
+    {
+        bool add = false;
+        DataReaderInstance::ChangeCollection& instance_changes = vit->second.cache_changes;
+        if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
+        {
+            add = true;
+        }
+        else
+        {
+            // Try to substitute the oldest sample.
+            CacheChange_t* first_change = instance_changes.at(0);
+            if (a_change->sourceTimestamp < first_change->sourceTimestamp)
+            {
+                // Received change is older than oldest, and should be discarded
+                return true;
+            }
+
+            // As the instance is ordered by source timestamp, we can always remove the first one.
+            add = remove_change_sub(first_change);
+        }
+
+        if (add)
+        {
+            return add_received_change_with_key(a_change, vit->second);
+        }
+    }
+
+    return false;
+}
+
+bool DataReaderHistory::add_received_change_with_key(
+        CacheChange_t* a_change,
+        DataReaderInstance& instance)
+{
+    if (add_to_reader_history_if_not_full(a_change))
+    {
+        add_to_instance(a_change, instance);
+        return true;
+    }
+
+    return false;
+}
+
+bool DataReaderHistory::add_to_reader_history_if_not_full(
+        CacheChange_t* a_change)
+{
+    if (m_isHistoryFull)
+    {
+        // Discarding the sample.
+        logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << type_name_);
+        return false;
+    }
+
+    bool ret_value = add_change(a_change);
+    if (m_changes.size() == static_cast<size_t>(m_att.maximumReservedCaches))
+    {
+        m_isHistoryFull = true;
+    }
+    return ret_value;
+}
+
+void DataReaderHistory::add_to_instance(
+        CacheChange_t* a_change,
+        DataReaderInstance& instance)
+{
+    // ADD TO KEY VECTOR
+    DataReaderCacheChange item = a_change;
+    eprosima::utilities::collections::sorted_vector_insert(instance.cache_changes, item,
+            [](const DataReaderCacheChange& lhs, const DataReaderCacheChange& rhs)
+            {
+                return lhs->sourceTimestamp < rhs->sourceTimestamp;
+            });
+
+    logInfo(SUBSCRIBER, mp_reader->getGuid().entityId
+            << ": Change " << a_change->sequenceNumber << " added from: "
+            << a_change->writerGUID << " with KEY: " << a_change->instanceHandle; );
+}
+
+bool DataReaderHistory::get_first_untaken_info(
+        SampleInfo& info)
+{
+    std::lock_guard<RecursiveTimedMutex> lock(*mp_mutex);
+
+    CacheChange_t* change = nullptr;
+    WriterProxy* wp = nullptr;
+    if (mp_reader->nextUntakenCache(&change, &wp))
+    {
+        auto it = keyed_changes_.find(change->instanceHandle);
+        assert(it != keyed_changes_.end());
+        auto& instance_changes = it->second.cache_changes;
+        auto item =
+                std::find_if(instance_changes.cbegin(), instance_changes.cend(),
+                        [change](const DataReaderCacheChange& v)
+                        {
+                            return v == change;
+                        });
+        ReadTakeCommand::generate_info(info, it->second, *item);
+        mp_reader->change_read_by_user(change, wp, false);
+        return true;
+    }
+
+    return false;
+}
+
+bool DataReaderHistory::find_key(
+        const InstanceHandle_t& handle,
+        InstanceCollection::iterator& vit_out)
+{
+    InstanceCollection::iterator vit;
+    vit = keyed_changes_.find(handle);
+    if (vit != keyed_changes_.end())
+    {
+        vit_out = vit;
+        return true;
+    }
+
+    if (keyed_changes_.size() < static_cast<size_t>(resource_limited_qos_.max_instances))
+    {
+        vit_out = keyed_changes_.emplace(handle,
+                        DataReaderInstance{key_changes_allocation_, key_writers_allocation_}).first;
+        return true;
+    }
+
+    for (vit = keyed_changes_.begin(); vit != keyed_changes_.end(); ++vit)
+    {
+        if (vit->second.cache_changes.size() == 0)
+        {
+            keyed_changes_.erase(vit);
+            vit_out = keyed_changes_.emplace(handle,
+                            DataReaderInstance{ key_changes_allocation_, key_writers_allocation_ }).first;
+            return true;
+        }
+    }
+
+    logWarning(SUBSCRIBER, "History has reached the maximum number of instances");
+    return false;
+}
+
+void DataReaderHistory::writer_unmatched(
+        const GUID_t& writer_guid,
+        const SequenceNumber_t& last_notified_seq)
+{
+    remove_changes_with_pred(
+        [&writer_guid, &last_notified_seq](CacheChange_t* ch)
+        {
+            return (writer_guid == ch->writerGUID) && (last_notified_seq < ch->sequenceNumber);
+        });
+}
+
+bool DataReaderHistory::remove_change_sub(
+        CacheChange_t* change)
+{
+    if (mp_reader == nullptr || mp_mutex == nullptr)
+    {
+        logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
+        return false;
+    }
+
+    std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
+    bool found = false;
+    InstanceCollection::iterator vit;
+    if (find_key(change->instanceHandle, vit))
+    {
+        for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit)
+        {
+            if ((*chit)->sequenceNumber == change->sequenceNumber &&
+                    (*chit)->writerGUID == change->writerGUID)
+            {
+                vit->second.cache_changes.erase(chit);
+                found = true;
+                break;
+            }
+        }
+    }
+    if (!found)
+    {
+        logError(SUBSCRIBER, "Change not found on this key, something is wrong");
+    }
+
+    if (remove_change(change))
+    {
+        m_isHistoryFull = false;
+        return true;
+    }
+
+    return false;
+}
+
+bool DataReaderHistory::remove_change_sub(
+        CacheChange_t* change,
+        DataReaderInstance::ChangeCollection::iterator& it)
+{
+    if (mp_reader == nullptr || mp_mutex == nullptr)
+    {
+        logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
+        return false;
+    }
+
+    std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
+    bool found = false;
+    InstanceCollection::iterator vit;
+    if (find_key(change->instanceHandle, vit))
+    {
+        for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit)
+        {
+            if ((*chit)->sequenceNumber == change->sequenceNumber &&
+                    (*chit)->writerGUID == change->writerGUID)
+            {
+                assert(it == chit);
+                it = vit->second.cache_changes.erase(chit);
+                found = true;
+                break;
+            }
+        }
+    }
+    if (!found)
+    {
+        logError(SUBSCRIBER, "Change not found on this key, something is wrong");
+    }
+
+    const_iterator chit = find_change_nts(change);
+    if (chit == changesEnd())
+    {
+        logInfo(RTPS_WRITER_HISTORY, "Trying to remove a change not in history");
+        return false;
+    }
+
+    m_isHistoryFull = false;
+    ReaderHistory::remove_change_nts(chit);
+
+    return true;
+}
+
+bool DataReaderHistory::set_next_deadline(
+        const InstanceHandle_t& handle,
+        const std::chrono::steady_clock::time_point& next_deadline_us)
+{
+    if (mp_reader == nullptr || mp_mutex == nullptr)
+    {
+        logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
+        return false;
+    }
+    std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
+    auto it = keyed_changes_.find(handle);
+    if (it == keyed_changes_.end())
+    {
+        return false;
+    }
+
+    it->second.next_deadline_us = next_deadline_us;
+    return true;
+}
+
+bool DataReaderHistory::get_next_deadline(
+        InstanceHandle_t& handle,
+        std::chrono::steady_clock::time_point& next_deadline_us)
+{
+    if (mp_reader == nullptr || mp_mutex == nullptr)
+    {
+        logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
+        return false;
+    }
+    std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
+    auto min = std::min_element(keyed_changes_.begin(),
+                    keyed_changes_.end(),
+                    [](
+                        const std::pair<InstanceHandle_t, DataReaderInstance>& lhs,
+                        const std::pair<InstanceHandle_t, DataReaderInstance>& rhs)
+                    {
+                        return lhs.second.next_deadline_us < rhs.second.next_deadline_us;
+                    });
+    handle = min->first;
+    next_deadline_us = min->second.next_deadline_us;
+    return true;
+}
+
+std::pair<bool, DataReaderHistory::instance_info> DataReaderHistory::lookup_instance(
+        const InstanceHandle_t& handle,
+        bool exact)
+{
+    if (!has_keys_)
+    {
+        if (handle.isDefined())
+        {
+            // NO_KEY topics can only return the ficticious instance.
+            // Execution can only get here for two reasons:
+            // - Looking for a specific instance (exact = true)
+            // - Looking for the next instance to the ficticious one (exact = false)
+            // In both cases, no instance should be returned
+            return { false, {InstanceHandle_t(), nullptr} };
+        }
+
+        if (exact)
+        {
+            // Looking for HANDLE_NIL, nothing to return
+            return { false, {InstanceHandle_t(), nullptr} };
+        }
+
+        // Looking for the first instance, return the ficticious one containing all changes
+        InstanceHandle_t tmp;
+        tmp.value[0] = 1;
+        return { true, {tmp, &keyed_changes_.begin()->second} };
+    }
+
+    InstanceCollection::iterator it;
+
+    if (exact)
+    {
+        it = keyed_changes_.find(handle);
+    }
+    else
+    {
+        auto comp = [](const InstanceHandle_t& h, const std::pair<InstanceHandle_t, DataReaderInstance>& it)
+                {
+                    return h < it.first;
+                };
+        it = std::upper_bound(keyed_changes_.begin(), keyed_changes_.end(), handle, comp);
+    }
+
+    if (it != keyed_changes_.end())
+    {
+        return { true, {it->first, &(it->second)} };
+    }
+    return { false, {InstanceHandle_t(), nullptr} };
+}
+
+void DataReaderHistory::check_and_remove_instance(
+        DataReaderHistory::instance_info& instance_info)
+{
+    DataReaderInstance* instance = instance_info.second;
+    if (instance->cache_changes.empty() &&
+            (InstanceStateKind::ALIVE_INSTANCE_STATE != instance->instance_state) &&
+            instance_info.first.isDefined())
+    {
+        keyed_changes_.erase(instance_info.first);
+        instance_info.second = nullptr;
+    }
+}
+
+ReaderHistory::iterator DataReaderHistory::remove_change_nts(
+        ReaderHistory::const_iterator removal,
+        bool release)
+{
+    if (removal != changesEnd())
+    {
+        CacheChange_t* p_sample = *removal;
+
+        if (!has_keys_ || p_sample->is_fully_assembled())
+        {
+            // clean any references to this CacheChange in the key state collection
+            auto it = keyed_changes_.find(p_sample->instanceHandle);
+
+            // if keyed and in history must be in the map
+            assert(it != keyed_changes_.end());
+
+            auto& c = it->second.cache_changes;
+            c.erase(std::remove(c.begin(), c.end(), p_sample), c.end());
+        }
+    }
+
+    // call the base class
+    return ReaderHistory::remove_change_nts(removal, release);
+}
+
+bool DataReaderHistory::completed_change(
+        CacheChange_t* change)
+{
+    bool ret_value = true;
+
+    if (!change->instanceHandle.isDefined())
+    {
+        InstanceCollection::iterator vit;
+        ret_value = compute_key_for_change_fn_(change) && find_key(change->instanceHandle, vit);
+        if (ret_value)
+        {
+            ret_value = !change->instanceHandle.isDefined() || complete_fn_(change, vit->second);
+        }
+
+        if (!ret_value)
+        {
+            const_iterator chit = find_change_nts(change);
+            if (chit != changesEnd())
+            {
+                m_isHistoryFull = false;
+                remove_change_nts(chit);
+            }
+            else
+            {
+                logError(SUBSCRIBER, "Change should exist but didn't find it");
+            }
+        }
+    }
+
+    return ret_value;
+}
+
+bool DataReaderHistory::completed_change_keep_all(
+        CacheChange_t* change,
+        DataReaderInstance& instance)
+{
+    DataReaderInstance::ChangeCollection& instance_changes = instance.cache_changes;
+    if (instance_changes.size() < static_cast<size_t>(resource_limited_qos_.max_samples_per_instance))
+    {
+        add_to_instance(change, instance);
+        return true;
+    }
+
+    logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance");
+    return false;
+}
+
+bool DataReaderHistory::completed_change_keep_last(
+        CacheChange_t* change,
+        DataReaderInstance& instance)
+{
+    bool add = false;
+    DataReaderInstance::ChangeCollection& instance_changes = instance.cache_changes;
+    if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
+    {
+        add = true;
+    }
+    else
+    {
+        // Try to substitute the oldest sample.
+
+        // As the instance should be ordered following the presentation QoS, we can always remove the first one.
+        add = remove_change_sub(instance_changes.at(0));
+    }
+
+    if (add)
+    {
+        add_to_instance(change, instance);
+        return true;
+    }
+
+    return false;
+}
+
+void DataReaderHistory::update_instance_nts(
+        CacheChange_t* const change)
+{
+    InstanceCollection::iterator vit;
+    vit = keyed_changes_.find(change->instanceHandle);
+
+    assert(vit != keyed_changes_.end());
+    vit->second.update_state(change->kind, change->writerGUID);
+    change->reader_info.disposed_generation_count = vit->second.disposed_generation_count;
+    change->reader_info.no_writers_generation_count = vit->second.no_writers_generation_count;
+}
+
+void DataReaderHistory::writer_not_alive(
+        const fastrtps::rtps::GUID_t& writer_guid)
+{
+    for (auto& it : keyed_changes_)
+    {
+        it.second.writer_removed(writer_guid);
+    }
+}
+
+} // namespace detail
+} // namsepace dds
+} // namespace fastdds
+} // namsepace eprosima
diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp
new file mode 100644
index 00000000000..629ce5f8f1f
--- /dev/null
+++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp
@@ -0,0 +1,313 @@
+// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * @file DataReaderHistory.hpp
+ */
+
+#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_
+#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_
+
+#include <chrono>
+#include <functional>
+#include <map>
+#include <utility>
+
+#include <fastdds/dds/core/policy/QosPolicies.hpp>
+#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
+#include <fastdds/dds/subscriber/SampleInfo.hpp>
+
+#include <fastdds/dds/topic/TopicDataType.hpp>
+#include <fastdds/dds/topic/TopicDescription.hpp>
+#include <fastdds/dds/topic/TypeSupport.hpp>
+
+#include <fastdds/rtps/common/CacheChange.h>
+#include <fastdds/rtps/common/Guid.h>
+#include <fastdds/rtps/common/InstanceHandle.h>
+#include <fastdds/rtps/common/SequenceNumber.h>
+#include <fastdds/rtps/history/ReaderHistory.h>
+#include <fastdds/rtps/resources/ResourceManagement.h>
+
+#include <fastrtps/utils/fixed_size_string.hpp>
+#include <fastrtps/utils/collections/ResourceLimitedContainerConfig.hpp>
+
+#include "DataReaderInstance.hpp"
+
+namespace eprosima {
+namespace fastdds {
+namespace dds {
+namespace detail {
+
+/**
+ * Class DataReaderHistory, container of the different CacheChanges of a DataReader
+ */
+class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory
+{
+public:
+
+    using MemoryManagementPolicy_t = eprosima::fastrtps::rtps::MemoryManagementPolicy_t;
+    using InstanceHandle_t = eprosima::fastrtps::rtps::InstanceHandle_t;
+    using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t;
+    using GUID_t = eprosima::fastrtps::rtps::GUID_t;
+    using SequenceNumber_t = eprosima::fastrtps::rtps::SequenceNumber_t;
+
+    using instance_info = std::pair<InstanceHandle_t, DataReaderInstance*>;
+
+    /**
+     * Constructor. Requires information about the DataReader.
+     * @param type  Type information. Needed to know if the type is keyed, as long as the maximum serialized size.
+     * @param topic Topic description. Topic and type name are used on debug messages.
+     * @param qos   DataReaderQoS policy. History related limits are taken from here.
+     */
+    DataReaderHistory(
+            const TypeSupport& type,
+            const TopicDescription& topic,
+            const DataReaderQos& qos);
+
+    ~DataReaderHistory() override;
+
+    /**
+     * Remove a specific change from the history.
+     * No Thread Safe
+     * @param removal iterator to the CacheChange_t to remove.
+     * @param release defaults to true and hints if the CacheChange_t should return to the pool
+     * @return iterator to the next CacheChange_t or end iterator.
+     */
+    iterator remove_change_nts(
+            const_iterator removal,
+            bool release = true) override;
+
+    /**
+     * Check if a new change can be added to this history.
+     *
+     * @param [in]  writer_guid                    GUID of the writer where the change came from.
+     * @param [in]  total_payload_size             Total payload size of the incoming change.
+     * @param [in]  unknown_missing_changes_up_to  The number of changes from the same writer with a lower sequence
+     *                                             number that could potentially be received in the future.
+     * @param [out] will_never_be_accepted         When the method returns @c false, this parameter will inform
+     *                                             whether the change could be accepted in the future or not.
+     *
+     * @pre change should not be present in the history
+     *
+     * @return Whether a call to received_change will succeed when called with the same arguments.
+     */
+    bool can_change_be_added_nts(
+            const GUID_t& writer_guid,
+            uint32_t total_payload_size,
+            size_t unknown_missing_changes_up_to,
+            bool& will_never_be_accepted) const override;
+
+    /**
+     * Called when a change is received by the Subscriber. Will add the change to the history.
+     * @pre Change should not be already present in the history.
+     * @param[in] change The received change
+     * @param unknown_missing_changes_up_to Number of missing changes before this one
+     * @return
+     */
+    bool received_change(
+            CacheChange_t* change,
+            size_t unknown_missing_changes_up_to) override;
+
+    /**
+     * Called when a fragmented change is received completely by the Subscriber. Will find its instance and store it.
+     * @pre Change should be already present in the history.
+     * @param[in] change The received change
+     * @return
+     */
+    bool completed_change(
+            CacheChange_t* change) override;
+
+    /**
+     * @brief Returns information about the first untaken sample.
+     * @param [out] info SampleInfo structure to store first untaken sample information.
+     * @return true if sample info was returned. false if there is no sample to take.
+     */
+    bool get_first_untaken_info(
+            SampleInfo& info);
+
+    /**
+     * This method is called to remove a change from the SubscriberHistory.
+     * @param change Pointer to the CacheChange_t.
+     * @return True if removed.
+     */
+    bool remove_change_sub(
+            CacheChange_t* change);
+
+    /**
+     * This method is called to remove a change from the SubscriberHistory.
+     * @param [in]     change Pointer to the CacheChange_t.
+     * @param [in,out] it     Iterator pointing to change on input. Will point to next valid change on output.
+     * @return True if removed.
+     */
+    bool remove_change_sub(
+            CacheChange_t* change,
+            DataReaderInstance::ChangeCollection::iterator& it);
+
+    /**
+     * Called when a writer is unmatched from the reader holding this history.
+     *
+     * This method will remove all the changes on the history that came from the writer being unmatched and which have
+     * not yet been notified to the user.
+     *
+     * @param writer_guid        GUID of the writer being unmatched.
+     * @param last_notified_seq  Last sequence number from the specified writer that was notified to the user.
+     */
+    void writer_unmatched(
+            const GUID_t& writer_guid,
+            const SequenceNumber_t& last_notified_seq) override;
+
+    /**
+     * @brief A method to set the next deadline for the given instance
+     * @param handle The handle to the instance
+     * @param next_deadline_us The time point when the deadline will occur
+     * @return True if the deadline was set correctly
+     */
+    bool set_next_deadline(
+            const InstanceHandle_t& handle,
+            const std::chrono::steady_clock::time_point& next_deadline_us);
+
+    /**
+     * @brief A method to get the next instance handle that will miss the deadline and the time when the deadline will occur
+     * @param handle The handle to the instance
+     * @param next_deadline_us The time point when the instance will miss the deadline
+     * @return True if the deadline was retrieved successfully
+     */
+    bool get_next_deadline(
+            InstanceHandle_t& handle,
+            std::chrono::steady_clock::time_point& next_deadline_us);
+
+    /**
+     * @brief Get the list of changes corresponding to an instance handle.
+     * @param handle The handle to the instance.
+     * @param exact  Indicates if the handle should match exactly (true) or if the first instance greater than the
+     *               input handle should be returned.
+     * @return A pair where:
+     *         - @c first is a boolean indicating if an instance was found
+     *         - @c second is a pair where:
+     *           - @c first is the handle of the returned instance
+     *           - @c second is a pointer to a DataReaderInstance that holds information about the returned instance
+     *
+     * @remarks When used on a NO_KEY topic, an instance will only be returned when called with
+     *          `handle = HANDLE_NIL` and `exact = false`.
+     */
+    std::pair<bool, instance_info> lookup_instance(
+            const InstanceHandle_t& handle,
+            bool exact);
+
+    void update_instance_nts(
+            CacheChange_t* const change);
+
+    void writer_not_alive(
+            const fastrtps::rtps::GUID_t& writer_guid);
+
+    void check_and_remove_instance(
+            instance_info& instance_info);
+
+private:
+
+    using InstanceCollection = std::map<InstanceHandle_t, DataReaderInstance>;
+
+    //!Resource limits for allocating the array of changes per instance
+    eprosima::fastrtps::ResourceLimitedContainerConfig key_changes_allocation_;
+    //!Resource limits for allocating the array of alive writers per instance
+    eprosima::fastrtps::ResourceLimitedContainerConfig key_writers_allocation_;
+    //!Map where keys are instance handles and values vectors of cache changes
+    InstanceCollection keyed_changes_;
+    //!HistoryQosPolicy values.
+    HistoryQosPolicy history_qos_;
+    //!ResourceLimitsQosPolicy values.
+    ResourceLimitsQosPolicy resource_limited_qos_;
+    //!Topic name
+    fastrtps::string_255 topic_name_;
+    //!Type name
+    fastrtps::string_255 type_name_;
+    //!Whether the type has keys
+    bool has_keys_;
+    //!TopicDataType
+    fastdds::dds::TopicDataType* type_;
+
+    //!Type object to deserialize Key
+    void* get_key_object_;
+
+    /// Function processing a received change
+    std::function<bool(CacheChange_t*, size_t)> receive_fn_;
+    /// Function to compute the instance handle of a received change
+    std::function<bool(CacheChange_t*)> compute_key_for_change_fn_;
+    /// Function processing a completed fragmented change
+    std::function<bool(CacheChange_t*, DataReaderInstance&)> complete_fn_;
+
+    /**
+     * @brief Method that finds a key in m_keyedChanges or tries to add it if not found
+     * @param a_change The change to get the key from
+     * @param map_it A map iterator to the given key
+     * @return True if it was found or could be added to the map
+     */
+    bool find_key(
+            const InstanceHandle_t& handle,
+            InstanceCollection::iterator& map_it);
+
+    /**
+     * @name Variants of incoming change processing.
+     *       Will be called with the history mutex taken.
+     * @param[in] change The received change
+     * @param unknown_missing_changes_up_to Number of missing changes before this one
+     * @return
+     */
+    ///@{
+    bool received_change_keep_all(
+            CacheChange_t* change,
+            size_t unknown_missing_changes_up_to);
+
+    bool received_change_keep_last(
+            CacheChange_t* change,
+            size_t unknown_missing_changes_up_to);
+    ///@}
+
+    /**
+     * @name Variants of change reconstruction completion processing.
+     *       Will be called with the history mutex taken.
+     * @param change The change for which the last missing fragment has been processed.
+     * @param instance Instance where the change should be added.
+     * @return true when the change was added to the instance.
+     * @return false when the change could not be added to the instance and has been removed from the history.
+     */
+    ///@{
+    bool completed_change_keep_all(
+            CacheChange_t* change,
+            DataReaderInstance& instance);
+
+    bool completed_change_keep_last(
+            CacheChange_t* change,
+            DataReaderInstance& instance);
+    ///@}
+
+    bool add_received_change_with_key(
+            CacheChange_t* a_change,
+            DataReaderInstance& instance);
+
+    bool add_to_reader_history_if_not_full(
+            CacheChange_t* a_change);
+
+    void add_to_instance(
+            CacheChange_t* a_change,
+            DataReaderInstance& instance);
+
+};
+
+} // namespace detail
+} // namespace dds
+} // namespace fastdds
+} // namespace eprosima
+
+#endif  // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERHISTORY_HPP_
diff --git a/src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp b/src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp
new file mode 100644
index 00000000000..ed5a4e267ac
--- /dev/null
+++ b/src/cpp/fastdds/subscriber/history/DataReaderInstance.hpp
@@ -0,0 +1,216 @@
+// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * @file DataReaderInstance.hpp
+ */
+
+#ifndef _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERINSTANCE_HPP_
+#define _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERINSTANCE_HPP_
+
+#include <chrono>
+#include <cstdint>
+
+#include <fastdds/dds/subscriber/InstanceState.hpp>
+#include <fastdds/dds/subscriber/ViewState.hpp>
+
+#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
+
+#include "DataReaderCacheChange.hpp"
+
+namespace eprosima {
+namespace fastdds {
+namespace dds {
+namespace detail {
+
+/// Book-keeping information for an instance
+struct DataReaderInstance
+{
+    using ChangeCollection = eprosima::fastrtps::ResourceLimitedVector<DataReaderCacheChange, std::true_type>;
+    using WriterOwnership = std::pair<fastrtps::rtps::GUID_t, uint32_t>;
+    using WriterCollection = eprosima::fastrtps::ResourceLimitedVector<WriterOwnership, std::false_type>;
+
+    //! A vector of DataReader changes belonging to the same instance
+    ChangeCollection cache_changes;
+    //! The list of alive writers for this instance
+    WriterCollection alive_writers;
+    //! GUID and strength of the current maximum strength writer
+    WriterOwnership current_owner{ {}, 0 };
+    //! The time when the group will miss the deadline
+    std::chrono::steady_clock::time_point next_deadline_us;
+    //! Current view state of the instance
+    ViewStateKind view_state = ViewStateKind::NEW_VIEW_STATE;
+    //! Current instance state of the instance
+    InstanceStateKind instance_state = InstanceStateKind::ALIVE_INSTANCE_STATE;
+    //! Current disposed generation of the instance
+    int32_t disposed_generation_count = 0;
+    //! Current no_writers generation of the instance
+    int32_t no_writers_generation_count = 0;
+
+    DataReaderInstance(
+            const eprosima::fastrtps::ResourceLimitedContainerConfig& changes_allocation,
+            const eprosima::fastrtps::ResourceLimitedContainerConfig& writers_allocation)
+        : cache_changes(changes_allocation)
+        , alive_writers(writers_allocation)
+    {
+    }
+
+    bool update_state(
+            const fastrtps::rtps::ChangeKind_t change_kind,
+            const fastrtps::rtps::GUID_t& writer_guid,
+            const uint32_t ownership_strength = 0)
+    {
+        bool ret_val = false;
+
+        switch (change_kind)
+        {
+            case fastrtps::rtps::ALIVE:
+                ret_val = writer_alive(writer_guid, ownership_strength);
+                break;
+
+            case fastrtps::rtps::NOT_ALIVE_DISPOSED:
+                ret_val = writer_dispose(writer_guid, ownership_strength);
+                break;
+
+            case fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED:
+                ret_val = writer_dispose(writer_guid, ownership_strength);
+                ret_val |= writer_unregister(writer_guid);
+                break;
+
+            case fastrtps::rtps::NOT_ALIVE_UNREGISTERED:
+                ret_val = writer_unregister(writer_guid);
+                break;
+
+            default:
+                // TODO (Miguel C): log error / assert
+                break;
+        }
+
+        return ret_val;
+    }
+
+    bool writer_removed(
+            const fastrtps::rtps::GUID_t& writer_guid)
+    {
+        return writer_unregister(writer_guid);
+    }
+
+private:
+
+    bool writer_alive(
+            const fastrtps::rtps::GUID_t& writer_guid,
+            const uint32_t ownership_strength)
+    {
+        bool ret_val = false;
+
+        if (ownership_strength >= current_owner.second)
+        {
+            current_owner.first = writer_guid;
+            current_owner.second = ownership_strength;
+
+            if (InstanceStateKind::NOT_ALIVE_DISPOSED_INSTANCE_STATE == instance_state)
+            {
+                ++disposed_generation_count;
+                alive_writers.clear();
+                view_state = ViewStateKind::NEW_VIEW_STATE;
+                ret_val = true;
+            }
+            else if (InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE == instance_state)
+            {
+                ++no_writers_generation_count;
+                alive_writers.clear();
+                view_state = ViewStateKind::NEW_VIEW_STATE;
+                ret_val = true;
+            }
+
+            writer_set(writer_guid, ownership_strength);
+            instance_state = InstanceStateKind::ALIVE_INSTANCE_STATE;
+        }
+
+        return ret_val;
+    }
+
+    bool writer_dispose(
+            const fastrtps::rtps::GUID_t& writer_guid,
+            const uint32_t ownership_strength)
+    {
+        bool ret_val = false;
+
+        writer_set(writer_guid, ownership_strength);
+        if (ownership_strength >= current_owner.second)
+        {
+            current_owner.first = writer_guid;
+            current_owner.second = ownership_strength;
+
+            if (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state)
+            {
+                ret_val = true;
+                instance_state = InstanceStateKind::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
+            }
+        }
+
+        return ret_val;
+    }
+
+    bool writer_unregister(
+            const fastrtps::rtps::GUID_t& writer_guid)
+    {
+        bool ret_val = false;
+
+        alive_writers.remove_if([&writer_guid](const WriterOwnership& item)
+                {
+                    return item.first == writer_guid;
+                });
+
+        if (writer_guid == current_owner.first)
+        {
+            current_owner.second = 0;
+            current_owner.first = fastrtps::rtps::c_Guid_Unknown;
+        }
+
+        if (alive_writers.empty() && (InstanceStateKind::ALIVE_INSTANCE_STATE == instance_state))
+        {
+            ret_val = true;
+            instance_state = InstanceStateKind::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
+        }
+
+        return ret_val;
+    }
+
+    void writer_set(
+            const fastrtps::rtps::GUID_t& writer_guid,
+            const uint32_t ownership_strength)
+    {
+        auto it = std::find_if(alive_writers.begin(), alive_writers.end(), [&writer_guid](const WriterOwnership& item)
+                        {
+                            return item.first == writer_guid;
+                        });
+        if (it == alive_writers.end())
+        {
+            alive_writers.emplace_back(writer_guid, ownership_strength);
+        }
+        else
+        {
+            it->second = ownership_strength;
+        }
+    }
+
+};
+
+} /* namespace detail */
+} /* namespace dds */
+} /* namespace fastdds */
+} /* namespace eprosima */
+
+#endif  // _FASTDDS_SUBSCRIBER_HISTORY_DATAREADERCACHECHANGE_HPP_
diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
index 9de26f80dd5..39a7b4542d6 100644
--- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
+++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
@@ -693,58 +693,6 @@ bool SubscriberHistory::get_next_deadline(
     return false;
 }
 
-std::pair<bool, SubscriberHistory::instance_info> SubscriberHistory::lookup_instance(
-        const InstanceHandle_t& handle,
-        bool exact)
-{
-    if (topic_att_.getTopicKind() == NO_KEY)
-    {
-        if (handle.isDefined())
-        {
-            // NO_KEY topics can only return the ficticious instance.
-            // Execution can only get here for two reasons:
-            // - Looking for a specific instance (exact = true)
-            // - Looking for the next instance to the ficticious one (exact = false)
-            // In both cases, no instance should be returned
-            return { false, {InstanceHandle_t(), nullptr} };
-        }
-        else
-        {
-            if (exact)
-            {
-                // Looking for HANDLE_NIL, nothing to return
-                return { false, {InstanceHandle_t(), nullptr} };
-            }
-
-            // Looking for the first instance, return the ficticious one containing all changes
-            InstanceHandle_t tmp;
-            tmp.value[0] = 1;
-            return { true, {tmp, &m_changes} };
-        }
-    }
-
-    t_m_Inst_Caches::iterator it;
-
-    if (exact)
-    {
-        it = keyed_changes_.find(handle);
-    }
-    else
-    {
-        auto comp = [](const InstanceHandle_t& h, const std::pair<InstanceHandle_t, KeyedChanges>& it)
-                {
-                    return h < it.first;
-                };
-        it = std::upper_bound(keyed_changes_.begin(), keyed_changes_.end(), handle, comp);
-    }
-
-    if (it != keyed_changes_.end())
-    {
-        return { true, {it->first, &(it->second.cache_changes)} };
-    }
-    return { false, {InstanceHandle_t(), nullptr} };
-}
-
 ReaderHistory::iterator SubscriberHistory::remove_change_nts(
         ReaderHistory::const_iterator removal,
         bool release)
diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp
index 6c4050d8e4a..a7baba15b44 100644
--- a/src/cpp/rtps/history/ReaderHistory.cpp
+++ b/src/cpp/rtps/history/ReaderHistory.cpp
@@ -158,38 +158,33 @@ History::iterator ReaderHistory::remove_change_nts(
     return ret_val;
 }
 
+void ReaderHistory::writer_unmatched(
+        const GUID_t& writer_guid,
+        const SequenceNumber_t& last_notified_seq)
+{
+    static_cast<void>(last_notified_seq);
+    remove_changes_with_pred(
+        [&writer_guid](CacheChange_t* ch)
+        {
+            return writer_guid == ch->writerGUID;
+        });
+}
+
 bool ReaderHistory::remove_changes_with_guid(
         const GUID_t& a_guid)
 {
-    std::vector<CacheChange_t*> changes_to_remove;
-
     if (mp_reader == nullptr || mp_mutex == nullptr)
     {
         logError(RTPS_READER_HISTORY, "You need to create a Reader with History before removing any changes");
         return false;
     }
 
-    {
-        //Lock scope
-        std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
-        for (std::vector<CacheChange_t*>::iterator chit = m_changes.begin(); chit != m_changes.end(); ++chit)
+    remove_changes_with_pred(
+        [a_guid](CacheChange_t* ch)
         {
-            if ((*chit)->writerGUID == a_guid)
-            {
-                changes_to_remove.push_back((*chit));
-            }
-        }
-    }//End lock scope
+            return a_guid == ch->writerGUID;
+        });
 
-    for (std::vector<CacheChange_t*>::iterator chit = changes_to_remove.begin(); chit != changes_to_remove.end();
-            ++chit)
-    {
-        if (!remove_change(*chit))
-        {
-            logError(RTPS_READER_HISTORY, "One of the cachechanged in the GUID removal bulk could not be removed");
-            return false;
-        }
-    }
     return true;
 }
 
diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp
index 3cb0c7939d9..74117e69ea2 100644
--- a/src/cpp/rtps/reader/StatefulReader.cpp
+++ b/src/cpp/rtps/reader/StatefulReader.cpp
@@ -285,7 +285,7 @@ bool StatefulReader::matched_writer_remove(
     if (is_alive_)
     {
         //Remove cachechanges belonging to the unmatched writer
-        mp_history->remove_changes_with_guid(writer_guid);
+        mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid));
 
         if (liveliness_lease_duration_ < c_TimeInfinite)
         {
@@ -802,34 +802,23 @@ bool StatefulReader::change_removed_by_history(
 {
     std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
 
-    if (is_alive_)
+    if (is_alive_ && a_change->is_fully_assembled())
     {
         if (wp != nullptr || matched_writer_lookup(a_change->writerGUID, &wp))
         {
-            if (a_change->is_fully_assembled())
-            {
-                if (!a_change->isRead && wp->available_changes_max() >= a_change->sequenceNumber)
-                {
-                    if (0 < total_unread_)
-                    {
-                        --total_unread_;
-                    }
-                }
-
-
-                wp->change_removed_from_history(a_change->sequenceNumber);
-            }
-            return true;
+            wp->change_removed_from_history(a_change->sequenceNumber);
         }
-        else
+
+        if (!a_change->isRead &&
+                get_last_notified(a_change->writerGUID) >= a_change->sequenceNumber)
         {
-            if (a_change->writerGUID.entityId != m_trustedWriterEntityId)
+            if (0 < total_unread_)
             {
-                // trusted entities messages mean no havoc
-                logError(RTPS_READER,
-                        " You should always find the WP associated with a change, something is very wrong");
+                --total_unread_;
             }
+
         }
+        return true;
     }
 
     //Simulate a datasharing notification to process any pending payloads that were waiting due to full history
@@ -1156,16 +1145,14 @@ bool StatefulReader::begin_sample_access_nts(
     const GUID_t& writer_guid = change->writerGUID;
     is_future_change = false;
 
-    if (!matched_writer_lookup(writer_guid, &wp))
-    {
-        return false;
-    }
-
-    SequenceNumber_t seq;
-    seq = wp->available_changes_max();
-    if (seq < change->sequenceNumber)
+    if (matched_writer_lookup(writer_guid, &wp))
     {
-        is_future_change = true;
+        SequenceNumber_t seq;
+        seq = wp->available_changes_max();
+        if (seq < change->sequenceNumber)
+        {
+            is_future_change = true;
+        }
     }
 
     return true;
@@ -1184,8 +1171,7 @@ void StatefulReader::change_read_by_user(
         WriterProxy* writer,
         bool mark_as_read)
 {
-    assert(writer != nullptr);
-    assert(change->writerGUID == writer->guid());
+    assert(!writer || change->writerGUID == writer->guid());
 
     // Mark change as read
     if (mark_as_read && !change->isRead)
@@ -1198,7 +1184,7 @@ void StatefulReader::change_read_by_user(
     }
 
     // If not datasharing, we are done
-    if (!writer->is_datasharing_writer())
+    if (!writer || !writer->is_datasharing_writer())
     {
         return;
     }
diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp
index 0d67b0dfe78..b65c073f409 100644
--- a/src/cpp/rtps/reader/StatelessReader.cpp
+++ b/src/cpp/rtps/reader/StatelessReader.cpp
@@ -178,7 +178,7 @@ bool StatelessReader::matched_writer_remove(
     std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
 
     //Remove cachechanges belonging to the unmatched writer
-    mp_history->remove_changes_with_guid(writer_guid);
+    mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid));
 
     if (liveliness_lease_duration_ < c_TimeInfinite)
     {
diff --git a/src/cpp/utils/SystemInfo.cpp b/src/cpp/utils/SystemInfo.cpp
index fb98234b3a1..a860540a96d 100644
--- a/src/cpp/utils/SystemInfo.cpp
+++ b/src/cpp/utils/SystemInfo.cpp
@@ -161,10 +161,11 @@ FileWatchHandle SystemInfo::watch_file(
                                break;
                        }
                    }));
-#endif // defined(_WIN32) || defined(__unix__)
+#else // defined(_WIN32) || defined(__unix__)
     static_cast<void>(filename);
     static_cast<void>(callback);
     return FileWatchHandle();
+#endif // defined(_WIN32) || defined(__unix__)
 }
 
 void SystemInfo::stop_watching_file(
diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp
index c2df5d695b5..0a27fbdbb40 100644
--- a/test/blackbox/api/dds-pim/PubSubReader.hpp
+++ b/test/blackbox/api/dds-pim/PubSubReader.hpp
@@ -1571,8 +1571,9 @@ class PubSubReader
             std::unique_lock<std::mutex> lock(mutex_);
 
             // Check order of changes.
-            ASSERT_LT(last_seq[info.instance_handle], info.sample_identity.sequence_number());
-            last_seq[info.instance_handle] = info.sample_identity.sequence_number();
+            LastSeqInfo seq_info{ info.instance_handle, info.sample_identity.writer_guid() };
+            ASSERT_LT(last_seq[seq_info], info.sample_identity.sequence_number());
+            last_seq[seq_info] = info.sample_identity.sequence_number();
 
             if (info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE)
             {
@@ -1657,7 +1658,8 @@ class PubSubReader
     unsigned int participant_matched_;
     std::atomic<bool> receiving_;
     eprosima::fastdds::dds::TypeSupport type_;
-    std::map<eprosima::fastrtps::rtps::InstanceHandle_t, eprosima::fastrtps::rtps::SequenceNumber_t> last_seq;
+    using LastSeqInfo = std::pair<eprosima::fastrtps::rtps::InstanceHandle_t, eprosima::fastrtps::rtps::GUID_t>;
+    std::map<LastSeqInfo, eprosima::fastrtps::rtps::SequenceNumber_t> last_seq;
     size_t current_processed_count_;
     size_t number_samples_expected_;
     bool discovery_result_;
diff --git a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h
index 932e4c58de9..840a9e1bce1 100644
--- a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h
+++ b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h
@@ -133,10 +133,22 @@ class ReaderHistory
         return m_changes.erase(removal);
     }
 
+    virtual void writer_unmatched(
+            const GUID_t& /*writer_guid*/,
+            const SequenceNumber_t& /*last_notified_seq*/)
+    {
+    }
+
     HistoryAttributes m_att;
 
 protected:
 
+    template<typename Pred>
+    inline void remove_changes_with_pred(
+            Pred)
+    {
+    }
+
     RTPSReader* mp_reader;
     RecursiveTimedMutex* mp_mutex;
     std::vector<CacheChange_t*> m_changes;
diff --git a/test/unittest/dds/publisher/CMakeLists.txt b/test/unittest/dds/publisher/CMakeLists.txt
index 17fcb776640..3b5305eb46f 100644
--- a/test/unittest/dds/publisher/CMakeLists.txt
+++ b/test/unittest/dds/publisher/CMakeLists.txt
@@ -75,6 +75,7 @@ set(DATAWRITERTESTS_SOURCE DataWriterTests.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp
+    ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/Subscriber.cpp
diff --git a/test/unittest/dds/status/CMakeLists.txt b/test/unittest/dds/status/CMakeLists.txt
index e68d3c33ee0..ae5a52d868f 100644
--- a/test/unittest/dds/status/CMakeLists.txt
+++ b/test/unittest/dds/status/CMakeLists.txt
@@ -36,6 +36,7 @@ set(LISTENERTESTS_SOURCE ListenerTests.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/SubscriberImpl.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp
+    ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/SubscriberQos.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp
     ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp
diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp
index 12e029e7299..a1c76f15a5c 100644
--- a/test/unittest/dds/subscriber/DataReaderTests.cpp
+++ b/test/unittest/dds/subscriber/DataReaderTests.cpp
@@ -1550,6 +1550,311 @@ TEST_F(DataReaderTests, read_unread)
     }
 }
 
+TEST_F(DataReaderTests, sample_info)
+{
+    DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT;
+    reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS;
+    reader_qos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
+    reader_qos.history().kind = KEEP_LAST_HISTORY_QOS;
+    reader_qos.history().depth = 1;
+    reader_qos.resource_limits().max_instances = 2;
+    reader_qos.resource_limits().max_samples_per_instance = 1;
+    reader_qos.resource_limits().max_samples = 2;
+
+    create_entities(nullptr, reader_qos);
+    publisher_->delete_datawriter(data_writer_);
+    data_writer_ = nullptr;
+
+    struct TestCmd
+    {
+        enum Operation
+        {
+            WRITE, UNREGISTER, DISPOSE, CLOSE
+        };
+
+        size_t writer_index;
+        Operation operation;
+        size_t instance_index;
+    };
+
+    struct TestInstanceResult
+    {
+        ReturnCode_t ret_code;
+        ViewStateKind view_state;
+        InstanceStateKind instance_state;
+        int32_t disposed_generation_count;
+        int32_t no_writers_generation_count;
+    };
+
+    struct TestStep
+    {
+        std::vector<TestCmd> operations;
+        TestInstanceResult instance_state[2];
+    };
+
+    struct TestState
+    {
+        TestState(
+                TypeSupport& type,
+                Topic* topic,
+                Publisher* publisher)
+            : topic_(topic)
+            , publisher_(publisher)
+        {
+            writer_qos_ = DATAWRITER_QOS_DEFAULT;
+            writer_qos_.publish_mode().kind = SYNCHRONOUS_PUBLISH_MODE;
+            writer_qos_.reliability().kind = RELIABLE_RELIABILITY_QOS;
+            writer_qos_.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
+            writer_qos_.history().kind = KEEP_LAST_HISTORY_QOS;
+            writer_qos_.history().depth = 1;
+            writer_qos_.resource_limits().max_instances = 2;
+            writer_qos_.resource_limits().max_samples_per_instance = 1;
+            writer_qos_.resource_limits().max_samples = 2;
+            writer_qos_.writer_data_lifecycle().autodispose_unregistered_instances = false;
+
+            data_[0].index(1);
+            data_[1].index(2);
+
+            type.get_key(&data_[0], &handles_[0]);
+            type.get_key(&data_[1], &handles_[1]);
+        }
+
+        ~TestState()
+        {
+            close_writer(0);
+            close_writer(1);
+        }
+
+        void run_test(
+                DataReader* reader,
+                const std::vector<TestStep>& steps)
+        {
+            for (const TestStep& step : steps)
+            {
+                for (const TestCmd& cmd : step.operations)
+                {
+                    execute(cmd);
+                    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+                }
+                check(reader, 0, step.instance_state[0]);
+                check(reader, 1, step.instance_state[1]);
+            }
+
+        }
+
+        void execute(
+                const TestCmd& cmd)
+        {
+            DataWriter* writer = nullptr;
+            ReturnCode_t ret_code;
+
+            switch (cmd.operation)
+            {
+                case TestCmd::CLOSE:
+                    close_writer(cmd.writer_index);
+                    break;
+
+                case TestCmd::DISPOSE:
+                    writer = open_writer(cmd.writer_index);
+                    ret_code = writer->dispose(&data_[cmd.instance_index], handles_[cmd.instance_index]);
+                    EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret_code);
+                    break;
+
+                case TestCmd::UNREGISTER:
+                    writer = open_writer(cmd.writer_index);
+                    ret_code = writer->unregister_instance(&data_[cmd.instance_index], handles_[cmd.instance_index]);
+                    EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret_code);
+                    break;
+
+                case TestCmd::WRITE:
+                    writer = open_writer(cmd.writer_index);
+                    ret_code = writer->write(&data_[cmd.instance_index], handles_[cmd.instance_index]);
+                    EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret_code);
+                    break;
+            }
+        }
+
+        void check(
+                DataReader* reader,
+                size_t instance_index,
+                const TestInstanceResult& instance_result)
+        {
+            FooSeq values;
+            SampleInfoSeq infos;
+            ReturnCode_t ret_code;
+
+            ret_code = reader->read_instance(values, infos, LENGTH_UNLIMITED, handles_[instance_index]);
+            EXPECT_EQ(ret_code, instance_result.ret_code);
+            if (ReturnCode_t::RETCODE_OK == ret_code)
+            {
+                EXPECT_EQ(instance_result.instance_state, infos[0].instance_state);
+                EXPECT_EQ(instance_result.view_state, infos[0].view_state);
+                EXPECT_EQ(instance_result.disposed_generation_count, infos[0].disposed_generation_count);
+                EXPECT_EQ(instance_result.no_writers_generation_count, infos[0].no_writers_generation_count);
+                EXPECT_EQ(ReturnCode_t::RETCODE_OK, reader->return_loan(values, infos));
+            }
+        }
+
+    private:
+
+        Topic* topic_;
+        Publisher* publisher_;
+        DataWriterQos writer_qos_;
+        DataWriter* writers_[2] = { nullptr, nullptr };
+
+        InstanceHandle_t handles_[2];
+        FooType data_[2];
+
+        void close_writer(
+                size_t index)
+        {
+            DataWriter*& writer = writers_[index];
+            if (writer != nullptr)
+            {
+                publisher_->delete_datawriter(writer);
+                writer = nullptr;
+            }
+        }
+
+        DataWriter* open_writer(
+                size_t index)
+        {
+            DataWriter*& writer = writers_[index];
+            if (writer == nullptr)
+            {
+                writer = publisher_->create_datawriter(topic_, writer_qos_);
+            }
+            return writer;
+        }
+
+    };
+
+    static const std::vector<TestStep> steps =
+    {
+        {
+            // Instances have never been written
+            {},
+            {
+                {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+                {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+            }
+        },
+        {
+            // One writer writes on first instance => that instance should be NEW and ALIVE
+            { {0, TestCmd::WRITE, 0} },
+            {
+                {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+                {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+            }
+        },
+        {
+            // Same writer writes on first instance => instance becomes NOT_NEW
+            { {0, TestCmd::WRITE, 0} },
+            {
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+                {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+            }
+        },
+        {
+            // Same writer disposes first instance => instance becomes NOT_ALIVE_DISPOSED
+            { {0, TestCmd::DISPOSE, 0} },
+            {
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 0},
+                {ReturnCode_t::RETCODE_BAD_PARAMETER, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+            }
+        },
+        {
+            // First writer writes second instance => NEW and ALIVE
+            // Second writer writes first instance => NEW and ALIVE
+            { {0, TestCmd::WRITE, 1}, {1, TestCmd::WRITE, 0} },
+            {
+                {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 0},
+                {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+            }
+        },
+        {
+            // Both writers write on second instance => NOT_NEW and ALIVE
+            { {0, TestCmd::WRITE, 1}, {1, TestCmd::WRITE, 1} },
+            {
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 0},
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+            }
+        },
+        {
+            // Second writer closes => first instance becomes NOT_ALIVE_NO_WRITERS
+            { {1, TestCmd::CLOSE, 0} },
+            {
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 1, 0},
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 0},
+            }
+        },
+        {
+            // First writer unregisters second instance => NOT_ALIVE_NO_WRITERS
+            { {0, TestCmd::UNREGISTER, 1} },
+            {
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 1, 0},
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 0, 0},
+            }
+        },
+        {
+            // Both writers write both instances
+            { {0, TestCmd::WRITE, 0}, {1, TestCmd::WRITE, 0}, {0, TestCmd::WRITE, 1}, {1, TestCmd::WRITE, 1} },
+            {
+                {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1},
+                {ReturnCode_t::RETCODE_OK, NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 1},
+            }
+        },
+        {
+            // Reading twice should return NOT_NEW
+            {},
+            {
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1},
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 0, 1},
+            }
+        },
+        {
+            // 0 - Unregistering while having another alive writer should not change state
+            // 1 - Disposing while having another alive writer is always done
+            { {0, TestCmd::UNREGISTER, 0}, {1, TestCmd::DISPOSE, 1} },
+            {
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1},
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 1},
+            }
+        },
+        {
+            // 0 - Writing and unregistering while having another alive writer should not change state
+            // 1 - Unregister a disposed instance should not change state
+            { {0, TestCmd::WRITE, 0}, {0, TestCmd::UNREGISTER, 1}, {1, TestCmd::UNREGISTER, 0} },
+            {
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, ALIVE_INSTANCE_STATE, 1, 1},
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 1},
+            }
+        },
+        {
+            // 0 - Closing both writers should return NOT_ALIVE_NO_WRITERS
+            // 1 - Closing both writers on a disposed instance should not change state
+            { {0, TestCmd::CLOSE, 0}, {1, TestCmd::CLOSE, 0} },
+            {
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, 1, 1},
+                {ReturnCode_t::RETCODE_OK, NOT_NEW_VIEW_STATE, NOT_ALIVE_DISPOSED_INSTANCE_STATE, 0, 1},
+            }
+        },
+    };
+
+    // Run test once
+    TestState state(type_, topic_, publisher_);
+    state.run_test(data_reader_, steps);
+
+    // Taking all data should remove instance information
+    FooSeq data;
+    SampleInfoSeq infos;
+    EXPECT_EQ(ReturnCode_t::RETCODE_OK, data_reader_->take(data, infos));
+    EXPECT_EQ(ReturnCode_t::RETCODE_OK, data_reader_->return_loan(data, infos));
+
+    // Run test again
+    state.run_test(data_reader_, steps);
+}
+
 /*
  * This type fails deserialization on odd samples
  */
@@ -1883,13 +2188,17 @@ TEST_F(DataReaderTests, check_key_history_wholesomeness_on_unmatch)
                 SampleInfoSeq infos;
 
                 res = data_reader_->take_instance(samples, infos, LENGTH_UNLIMITED, handle_ok_);
+
+                // If the DataWriter is destroyed only the non-notified samples must be removed
+                // this operation MUST succeed
+                ASSERT_EQ(res, ReturnCode_t::RETCODE_OK);
+
+                data_reader_->return_loan(samples, infos);
             });
 
     // Check if the thread hangs
     // wait for termination
     std::this_thread::sleep_for(std::chrono::milliseconds(500));
-    // check expected result, if query thread hangs res = ReturnCode_t::RETCODE_OK
-    ASSERT_NE(res, ReturnCode_t::RETCODE_OK);
     query.join();
 }
 
diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt
index 1eca0b887c0..4ec673f91d4 100644
--- a/test/unittest/statistics/dds/CMakeLists.txt
+++ b/test/unittest/statistics/dds/CMakeLists.txt
@@ -129,6 +129,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS)
         ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp
         ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReader.cpp
         ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/DataReaderImpl.cpp
+        ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
         ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/DataReaderQos.cpp
         ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/qos/ReaderQos.cpp
         ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/subscriber/Subscriber.cpp
diff --git a/test/unittest/utils/ResourceLimitedVectorTests.cpp b/test/unittest/utils/ResourceLimitedVectorTests.cpp
index 60bf89b7eb5..98c2a581bad 100644
--- a/test/unittest/utils/ResourceLimitedVectorTests.cpp
+++ b/test/unittest/utils/ResourceLimitedVectorTests.cpp
@@ -21,14 +21,15 @@ using namespace eprosima::fastrtps;
 // some implementations of std::vector would enforce power of two capacities
 constexpr size_t NUM_ITEMS = 32;
 
-class ResourceLimitedVectorTests: public ::testing::Test
+class ResourceLimitedVectorTests : public ::testing::Test
 {
-    public:
-        const int testbed[NUM_ITEMS] = 
-        {
-             1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
-            17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32
-        };
+public:
+
+    const int testbed[NUM_ITEMS] =
+    {
+        1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
+        17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32
+    };
 };
 
 TEST_F(ResourceLimitedVectorTests, default_constructor)
@@ -59,6 +60,26 @@ TEST_F(ResourceLimitedVectorTests, default_constructor)
     // Should be empty
     ASSERT_TRUE(uut.empty());
     ASSERT_EQ(uut.capacity(), NUM_ITEMS);
+
+    // Add all items using insert
+    auto it = uut.cbegin();
+    for (int i : testbed)
+    {
+        it = uut.insert(it, i);
+        ASSERT_NE(it, uut.cend());
+        it = uut.insert(it, std::move(i));
+        ASSERT_NE(it, uut.cend());
+    }
+
+    // Vector should be filled
+    ASSERT_EQ(uut.size(), 2 * NUM_ITEMS);
+    ASSERT_EQ(uut.capacity(), 2 * NUM_ITEMS);
+
+    uut.clear();
+
+    // Should be empty but allocated
+    ASSERT_TRUE(uut.empty());
+    ASSERT_EQ(uut.capacity(), 2 * NUM_ITEMS);
 }
 
 TEST_F(ResourceLimitedVectorTests, static_config)
@@ -103,7 +124,7 @@ TEST_F(ResourceLimitedVectorTests, static_config)
 
 TEST_F(ResourceLimitedVectorTests, prealocated_growing_1_config)
 {
-    ResourceLimitedVector<int> uut(ResourceLimitedContainerConfig{ NUM_ITEMS, NUM_ITEMS*2, 1});
+    ResourceLimitedVector<int> uut(ResourceLimitedContainerConfig{ NUM_ITEMS, NUM_ITEMS * 2, 1});
 
     ASSERT_TRUE(uut.empty());
 
@@ -130,8 +151,8 @@ TEST_F(ResourceLimitedVectorTests, prealocated_growing_1_config)
     }
 
     // Vector should be filled
-    ASSERT_EQ(uut.size(), NUM_ITEMS*2);
-    ASSERT_EQ(uut.capacity(), NUM_ITEMS*2);
+    ASSERT_EQ(uut.size(), NUM_ITEMS * 2);
+    ASSERT_EQ(uut.capacity(), NUM_ITEMS * 2);
 
     // Adding more values should return error
     for (int i : testbed)
@@ -227,7 +248,10 @@ TEST_F(ResourceLimitedVectorTests, remove_if)
     ASSERT_EQ(uut.size(), NUM_ITEMS);
     ASSERT_EQ(uut.capacity(), NUM_ITEMS);
 
-    auto is_odd = [](int i) { return (i & 1) != 0; };
+    auto is_odd = [](int i)
+            {
+                return (i & 1) != 0;
+            };
 
     // Remove all odd items and check no errors
     for (size_t i = 0; i < NUM_ITEMS / 2; i++)
@@ -247,7 +271,9 @@ TEST_F(ResourceLimitedVectorTests, remove_if)
 }
 
 
-int main(int argc, char **argv)
+int main(
+        int argc,
+        char** argv)
 {
     testing::InitGoogleTest(&argc, argv);
     return RUN_ALL_TESTS();
diff --git a/versions.md b/versions.md
index 98da12cffff..b9bec8a1995 100644
--- a/versions.md
+++ b/versions.md
@@ -7,6 +7,8 @@ Forthcoming
   API, implies ABI break)
 * New DataWriter API allowing to wait for acknowledgements for a specific instance (extends DataWriter API, implies ABI
   break)
+* Generation of GUID on entity creation (ABI break on RTPS layer)
+* Adding DataReader history with correct implementation of instance_state and view_state (ABI break on RTPS layer)
 *
 
 Version 2.4.0