Skip to content

Commit

Permalink
DataReaderHistory (#2363)
Browse files Browse the repository at this point in the history
* Refactor DataReaderImpl to use a new DataReaderHistory class (#2177)

* Refs 12400. Duplicating SubscriberHistory into DataReaderHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Using DataReaderHistory on DataReaderImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Avoid using TopicAttributes.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Additional cleanup.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Using DDS SampleInfo.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12404. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Fixed DataReaderHistory (#2194)

* DataReader test for sample_info fields (#2193)

* Refs 12400. Initial test infrastructure.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Additional tests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Tests are run twice with a take in the middle.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12469. Fixed warnings.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12469. Additional comments.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Adding implementation for instance_state and view_state (#2298)

* Refs 12400. Added DataReaderCacheChange.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Added DataReaderInstance.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. DataReaderHistory using new types.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Removing unnecessary method from SubscriberHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. ReadTakeCommand receives full instance information.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. ReadTakeCommand checks for instance states.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. ReadTakeCommand fills sample info from instance data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Added insert method to ResourceLimitedVector.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. DataReaderInstance uses ResourceLimitedVector.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. get_first_untaken_info takes sample info from instance data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Discard received change when older than oldest.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Fixing KEEP_ALL with keys.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Refactor to always use instances.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Basic structure for update instance state.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Adding alive_writers and current_owner to DataReaderInstance.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Implementing writer_alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Implementing writer_dispose.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Implementing writer_unregister.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Setting NOT_NEW on returned instances.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Correct return code when returning samples with no data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Set view_state to NEW when changing instance_state to ALIVE.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Moving generation counts into CacheChange_t.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Assigning generation counts after processing instance state.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Update instance_state when writer becomes not alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Clear alive_writers when changing generation.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Add writer_unmatched to ReaderHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. NOT_ALIVE_UNREGISTERED should not return valid data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Set autodispose_unregistered_instances to false on test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Remove instance when it becomes empty and is not alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Refactor into writer_not_alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Keeping samples from unmatched writers.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Avoid keeping non-notified samples.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Fixing compilation warnings on windows.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12758. Fixing assertion on WriterProxy logic. If we only notify fragmented DATA reception on completion we should only notify removal of fully assembled samples.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12758 Fixing DataReaderHistory test that checks DataWriter disposal behaviour.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12758. Use move semantics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Use ResourceLimitedVector for writers.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Apply pre-allocation policies.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. PubSubReader. Account for writer_guid on last_seq checks.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Added can_change_be_added_nts.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Removed unused method.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Always use completed changes for key computation.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Fixed ResourceLimitedVector::insert.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Avoid dynamic allocation inside remove_changes_with_pred.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Optimization on DataReaderHistory::remove_change_nts.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Method writer_unmatched documented and improved.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Do not complete changes for non-keyed topics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Do not remove incomplete changes for keyed topics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Different removal policy on ReaderHistory and DataReaderHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Fix unused parameter warning.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Removed unused header.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Doxydoc improvements.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12400. Adding checks for insert on ResourceLimitedVectorTests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Fixed rvalue version of ResourceLimitedVector::insert.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Added feature to versions.md.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Fixed inclusion of DataReaderHistory.cpp on CMake files.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Add missing feature to versions.md.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Miguel Barro <miguelbarro@eprosima.com>
  • Loading branch information
MiguelCompany and Miguel Barro authored Dec 21, 2021
1 parent b084f8b commit 2705118
Show file tree
Hide file tree
Showing 26 changed files with 1,835 additions and 247 deletions.
2 changes: 1 addition & 1 deletion include/fastdds/dds/subscriber/SampleInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct SampleInfo

//! the generation difference between the time the sample was received, and the time the most recent sample was received.
//! The most recent sample used for the calculation may or may not be in the returned collection
int32_t absoulte_generation_rank;
int32_t absolute_generation_rank;

//! time provided by the DataWriter when the sample was written
fastrtps::rtps::Time_t source_timestamp;
Expand Down
4 changes: 4 additions & 0 deletions include/fastdds/rtps/common/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ struct CacheChangeReaderInfo_t
{
//!Reception TimeStamp (only used in Readers)
Time_t receptionTimestamp;
//! Disposed generation of the instance when this entry was added to it
int32_t disposed_generation_count;
//! No-writers generation of the instance when this entry was added to it
int32_t no_writers_generation_count;
};

/**
Expand Down
28 changes: 28 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,19 @@ class ReaderHistory : public History
CacheChange_t** min_change,
const GUID_t& writerGuid);

/**
* Called when a writer is unmatched from the reader holding this history.
*
* This method will remove all the changes on the history that came from the writer being unmatched and which have
* not yet been notified to the user.
*
* @param writer_guid GUID of the writer being unmatched.
* @param last_notified_seq Last sequence number from the specified writer that was notified to the user.
*/
RTPS_DllAPI virtual void writer_unmatched(
const GUID_t& writer_guid,
const SequenceNumber_t& last_notified_seq);

protected:

RTPS_DllAPI bool do_reserve_cache(
Expand All @@ -161,6 +174,21 @@ class ReaderHistory : public History
RTPS_DllAPI void do_release_cache(
CacheChange_t* ch) override;

template<typename Pred>
inline void remove_changes_with_pred(
Pred pred)
{
assert(nullptr != mp_reader);
assert(nullptr != mp_mutex);

std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
std::vector<CacheChange_t*>::iterator new_end = std::remove_if(m_changes.begin(), m_changes.end(), pred);
while (new_end != m_changes.end())
{
new_end = remove_change_nts(new_end);
}
}

//!Pointer to the reader
RTPSReader* mp_reader;

Expand Down
21 changes: 0 additions & 21 deletions include/fastrtps/subscriber/SubscriberHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ class SubscriberHistory : public rtps::ReaderHistory
{
public:

using instance_info = std::pair<rtps::InstanceHandle_t, std::vector<rtps::CacheChange_t*>*>;

/**
* Constructor. Requires information about the subscriber.
* @param topic_att TopicAttributes.
Expand Down Expand Up @@ -179,25 +177,6 @@ class SubscriberHistory : public rtps::ReaderHistory
rtps::InstanceHandle_t& handle,
std::chrono::steady_clock::time_point& next_deadline_us);

/**
* @brief Get the list of changes corresponding to an instance handle.
* @param handle The handle to the instance.
* @param exact Indicates if the handle should match exactly (true) or if the first instance greater than the
* input handle should be returned.
* @return A pair where:
* - @c first is a boolean indicating if an instance was found
* - @c second is a pair where:
* - @c first is the handle of the returned instance
* - @c second is a pointer to a std::vector<rtps::CacheChange_t*> with the list of changes for the
* returned instance
*
* @remarks When used on a NO_KEY topic, an instance will only be returned when called with
* `handle = HANDLE_NIL` and `exact = false`.
*/
std::pair<bool, instance_info> lookup_instance(
const rtps::InstanceHandle_t& handle,
bool exact);

private:

using t_m_Inst_Caches = std::map<rtps::InstanceHandle_t, KeyedChanges>;
Expand Down
42 changes: 42 additions & 0 deletions include/fastrtps/utils/collections/ResourceLimitedVector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,48 @@ class ResourceLimitedVector
return *this;
}

/**
* Insert value before pos.
*
* @param pos iterator before which the content will be inserted. pos may be the end() iterator.
* @param value element value to insert.
*
* @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
*/
iterator insert(
const_iterator pos,
const value_type& value)
{
auto dist = std::distance(collection_.cbegin(), pos);
if (!ensure_capacity())
{
return end();
}

return collection_.insert(collection_.cbegin() + dist, value);
}

/**
* Insert value before pos.
*
* @param pos iterator before which the content will be inserted. pos may be the end() iterator.
* @param value element value to insert.
*
* @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
*/
iterator insert(
const_iterator pos,
value_type&& value)
{
auto dist = std::distance(collection_.cbegin(), pos);
if (!ensure_capacity())
{
return end();
}

return collection_.insert(collection_.cbegin() + dist, std::move(value));
}

/**
* Add element at the end.
*
Expand Down
3 changes: 1 addition & 2 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ set(${PROJECT_NAME}_source_files
fastrtps_deprecated/subscriber/Subscriber.cpp
fastrtps_deprecated/subscriber/SubscriberImpl.cpp
fastrtps_deprecated/subscriber/SubscriberHistory.cpp
fastdds/subscriber/DataReader.cpp
fastdds/publisher/DataWriter.cpp
fastdds/subscriber/DataReaderImpl.cpp
fastdds/publisher/DataWriterImpl.cpp
fastdds/topic/ContentFilteredTopic.cpp
fastdds/topic/Topic.cpp
Expand All @@ -105,6 +103,7 @@ set(${PROJECT_NAME}_source_files
fastdds/subscriber/Subscriber.cpp
fastdds/subscriber/DataReader.cpp
fastdds/subscriber/DataReaderImpl.cpp
fastdds/subscriber/history/DataReaderHistory.cpp
fastdds/domain/DomainParticipantFactory.cpp
fastdds/domain/DomainParticipantImpl.cpp
fastdds/domain/DomainParticipant.cpp
Expand Down
59 changes: 15 additions & 44 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,40 +62,6 @@ namespace eprosima {
namespace fastdds {
namespace dds {

static void sample_info_to_dds (
const SampleInfo_t& rtps_info,
SampleInfo* dds_info)
{
dds_info->sample_state = NOT_READ_SAMPLE_STATE;
dds_info->view_state = NOT_NEW_VIEW_STATE;
dds_info->disposed_generation_count = 0;
dds_info->no_writers_generation_count = 1;
dds_info->sample_rank = 0;
dds_info->generation_rank = 0;
dds_info->absoulte_generation_rank = 0;
dds_info->source_timestamp = rtps_info.sourceTimestamp;
dds_info->reception_timestamp = rtps_info.receptionTimestamp;
dds_info->instance_handle = rtps_info.iHandle;
dds_info->publication_handle = fastrtps::rtps::InstanceHandle_t(rtps_info.sample_identity.writer_guid());
dds_info->sample_identity = rtps_info.sample_identity;
dds_info->related_sample_identity = rtps_info.related_sample_identity;
dds_info->valid_data = rtps_info.sampleKind == eprosima::fastrtps::rtps::ALIVE ? true : false;

switch (rtps_info.sampleKind)
{
case eprosima::fastrtps::rtps::ALIVE:
dds_info->instance_state = ALIVE_INSTANCE_STATE;
break;
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
dds_info->instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE;
break;
default:
//TODO [ILG] change this if the other kinds ever get implemented
dds_info->instance_state = ALIVE_INSTANCE_STATE;
break;
}
}

static bool collections_have_same_properties(
const LoanableCollection& data_values,
const SampleInfoSeq& sample_infos)
Expand Down Expand Up @@ -131,11 +97,7 @@ DataReaderImpl::DataReaderImpl(
, topic_(topic)
, qos_(&qos == &DATAREADER_QOS_DEFAULT ? subscriber_->get_default_datareader_qos() : qos)
#pragma warning (disable : 4355 )
, history_(topic_attributes(),
type_.get(),
qos_.get_readerqos(subscriber_->get_qos()),
type_->m_typeSize + 3, /* Possible alignment */
qos_.endpoint().history_memory_policy)
, history_(type, *topic, qos_)
, listener_(listener)
, reader_listener_(this)
, deadline_duration_us_(qos_.deadline().period.to_ns() * 1e-3)
Expand Down Expand Up @@ -714,10 +676,8 @@ ReturnCode_t DataReaderImpl::get_first_untaken_info(
return ReturnCode_t::RETCODE_NOT_ENABLED;
}

SampleInfo_t rtps_info;
if (history_.get_first_untaken_info(&rtps_info))
if (history_.get_first_untaken_info(*info))
{
sample_info_to_dds(rtps_info, info);
return ReturnCode_t::RETCODE_OK;
}
return ReturnCode_t::RETCODE_NO_DATA;
Expand Down Expand Up @@ -892,10 +852,10 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos(
bool DataReaderImpl::on_new_cache_change_added(
const CacheChange_t* const change)
{
std::lock_guard<RecursiveTimedMutex> guard(reader_->getMutex());

if (qos_.deadline().period != c_TimeInfinite)
{
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());

if (!history_.set_next_deadline(
change->instanceHandle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
Expand All @@ -913,6 +873,7 @@ bool DataReaderImpl::on_new_cache_change_added(
}

CacheChange_t* new_change = const_cast<CacheChange_t*>(change);
history_.update_instance_nts(new_change);

if (qos_.lifespan().duration == c_TimeInfinite)
{
Expand Down Expand Up @@ -968,6 +929,11 @@ void DataReaderImpl::update_subscription_matched_status(
subscription_matched_status_.last_publication_handle = status.last_publication_handle;
}

if (count_change < 0)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
}

StatusMask notify_status = StatusMask::subscription_matched();
DataReaderListener* listener = get_listener_for(notify_status);
if (listener != nullptr)
Expand Down Expand Up @@ -1235,6 +1201,11 @@ RequestedIncompatibleQosStatus& DataReaderImpl::update_requested_incompatible_qo
LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
const fastrtps::LivelinessChangedStatus& status)
{
if (0 < status.not_alive_count_change)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
}

liveliness_changed_status_.alive_count = status.alive_count;
liveliness_changed_status_.not_alive_count = status.not_alive_count;
liveliness_changed_status_.alive_count_change += status.alive_count_change;
Expand Down
5 changes: 3 additions & 2 deletions src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include <fastdds/rtps/reader/ReaderListener.h>

#include <fastrtps/attributes/TopicAttributes.h>
#include <fastrtps/subscriber/SubscriberHistory.h>
#include <fastrtps/qos/LivelinessChangedStatus.h>
#include <fastrtps/types/TypesBase.h>

Expand All @@ -46,6 +45,8 @@
#include <fastdds/subscriber/SubscriberImpl.hpp>
#include <rtps/history/ITopicPayloadPool.h>

#include <fastdds/subscriber/history/DataReaderHistory.hpp>

using eprosima::fastrtps::types::ReturnCode_t;

namespace eprosima {
Expand Down Expand Up @@ -331,7 +332,7 @@ class DataReaderImpl
DataReaderQos qos_;

//!History
fastrtps::SubscriberHistory history_;
detail::DataReaderHistory history_;

//!Listener
DataReaderListener* listener_ = nullptr;
Expand Down
Loading

0 comments on commit 2705118

Please sign in to comment.