Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataReaderHistory [12400] #2363

Merged
merged 10 commits into from
Dec 21, 2021
2 changes: 1 addition & 1 deletion include/fastdds/dds/subscriber/SampleInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct SampleInfo

//! the generation difference between the time the sample was received, and the time the most recent sample was received.
//! The most recent sample used for the calculation may or may not be in the returned collection
int32_t absoulte_generation_rank;
int32_t absolute_generation_rank;

//! time provided by the DataWriter when the sample was written
fastrtps::rtps::Time_t source_timestamp;
Expand Down
4 changes: 4 additions & 0 deletions include/fastdds/rtps/common/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ struct CacheChangeReaderInfo_t
{
//!Reception TimeStamp (only used in Readers)
Time_t receptionTimestamp;
//! Disposed generation of the instance when this entry was added to it
int32_t disposed_generation_count;
//! No-writers generation of the instance when this entry was added to it
int32_t no_writers_generation_count;
};

/**
Expand Down
28 changes: 28 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,19 @@ class ReaderHistory : public History
CacheChange_t** min_change,
const GUID_t& writerGuid);

/**
* Called when a writer is unmatched from the reader holding this history.
*
* This method will remove all the changes on the history that came from the writer being unmatched and which have
* not yet been notified to the user.
*
* @param writer_guid GUID of the writer being unmatched.
* @param last_notified_seq Last sequence number from the specified writer that was notified to the user.
*/
RTPS_DllAPI virtual void writer_unmatched(
const GUID_t& writer_guid,
const SequenceNumber_t& last_notified_seq);

protected:

RTPS_DllAPI bool do_reserve_cache(
Expand All @@ -161,6 +174,21 @@ class ReaderHistory : public History
RTPS_DllAPI void do_release_cache(
CacheChange_t* ch) override;

template<typename Pred>
inline void remove_changes_with_pred(
Pred pred)
{
assert(nullptr != mp_reader);
assert(nullptr != mp_mutex);

std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
std::vector<CacheChange_t*>::iterator new_end = std::remove_if(m_changes.begin(), m_changes.end(), pred);
while (new_end != m_changes.end())
{
new_end = remove_change_nts(new_end);
}
}

//!Pointer to the reader
RTPSReader* mp_reader;

Expand Down
21 changes: 0 additions & 21 deletions include/fastrtps/subscriber/SubscriberHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ class SubscriberHistory : public rtps::ReaderHistory
{
public:

using instance_info = std::pair<rtps::InstanceHandle_t, std::vector<rtps::CacheChange_t*>*>;

/**
* Constructor. Requires information about the subscriber.
* @param topic_att TopicAttributes.
Expand Down Expand Up @@ -179,25 +177,6 @@ class SubscriberHistory : public rtps::ReaderHistory
rtps::InstanceHandle_t& handle,
std::chrono::steady_clock::time_point& next_deadline_us);

/**
* @brief Get the list of changes corresponding to an instance handle.
* @param handle The handle to the instance.
* @param exact Indicates if the handle should match exactly (true) or if the first instance greater than the
* input handle should be returned.
* @return A pair where:
* - @c first is a boolean indicating if an instance was found
* - @c second is a pair where:
* - @c first is the handle of the returned instance
* - @c second is a pointer to a std::vector<rtps::CacheChange_t*> with the list of changes for the
* returned instance
*
* @remarks When used on a NO_KEY topic, an instance will only be returned when called with
* `handle = HANDLE_NIL` and `exact = false`.
*/
std::pair<bool, instance_info> lookup_instance(
const rtps::InstanceHandle_t& handle,
bool exact);

private:

using t_m_Inst_Caches = std::map<rtps::InstanceHandle_t, KeyedChanges>;
Expand Down
42 changes: 42 additions & 0 deletions include/fastrtps/utils/collections/ResourceLimitedVector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,48 @@ class ResourceLimitedVector
return *this;
}

/**
* Insert value before pos.
*
* @param pos iterator before which the content will be inserted. pos may be the end() iterator.
* @param value element value to insert.
*
* @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
*/
iterator insert(
const_iterator pos,
const value_type& value)
{
auto dist = std::distance(collection_.cbegin(), pos);
if (!ensure_capacity())
{
return end();
}

return collection_.insert(collection_.cbegin() + dist, value);
}

/**
* Insert value before pos.
*
* @param pos iterator before which the content will be inserted. pos may be the end() iterator.
* @param value element value to insert.
*
* @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
*/
iterator insert(
const_iterator pos,
value_type&& value)
{
auto dist = std::distance(collection_.cbegin(), pos);
if (!ensure_capacity())
{
return end();
}

return collection_.insert(collection_.cbegin() + dist, std::move(value));
}

/**
* Add element at the end.
*
Expand Down
3 changes: 1 addition & 2 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ set(${PROJECT_NAME}_source_files
fastrtps_deprecated/subscriber/Subscriber.cpp
fastrtps_deprecated/subscriber/SubscriberImpl.cpp
fastrtps_deprecated/subscriber/SubscriberHistory.cpp
fastdds/subscriber/DataReader.cpp
fastdds/publisher/DataWriter.cpp
fastdds/subscriber/DataReaderImpl.cpp
fastdds/publisher/DataWriterImpl.cpp
fastdds/topic/Topic.cpp
fastdds/topic/TopicImpl.cpp
Expand All @@ -104,6 +102,7 @@ set(${PROJECT_NAME}_source_files
fastdds/subscriber/Subscriber.cpp
fastdds/subscriber/DataReader.cpp
fastdds/subscriber/DataReaderImpl.cpp
fastdds/subscriber/history/DataReaderHistory.cpp
fastdds/domain/DomainParticipantFactory.cpp
fastdds/domain/DomainParticipantImpl.cpp
fastdds/domain/DomainParticipant.cpp
Expand Down
59 changes: 15 additions & 44 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,6 @@ namespace eprosima {
namespace fastdds {
namespace dds {

static void sample_info_to_dds (
const SampleInfo_t& rtps_info,
SampleInfo* dds_info)
{
dds_info->sample_state = NOT_READ_SAMPLE_STATE;
dds_info->view_state = NOT_NEW_VIEW_STATE;
dds_info->disposed_generation_count = 0;
dds_info->no_writers_generation_count = 1;
dds_info->sample_rank = 0;
dds_info->generation_rank = 0;
dds_info->absoulte_generation_rank = 0;
dds_info->source_timestamp = rtps_info.sourceTimestamp;
dds_info->reception_timestamp = rtps_info.receptionTimestamp;
dds_info->instance_handle = rtps_info.iHandle;
dds_info->publication_handle = fastrtps::rtps::InstanceHandle_t(rtps_info.sample_identity.writer_guid());
dds_info->sample_identity = rtps_info.sample_identity;
dds_info->related_sample_identity = rtps_info.related_sample_identity;
dds_info->valid_data = rtps_info.sampleKind == eprosima::fastrtps::rtps::ALIVE ? true : false;

switch (rtps_info.sampleKind)
{
case eprosima::fastrtps::rtps::ALIVE:
dds_info->instance_state = ALIVE_INSTANCE_STATE;
break;
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
dds_info->instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE;
break;
default:
//TODO [ILG] change this if the other kinds ever get implemented
dds_info->instance_state = ALIVE_INSTANCE_STATE;
break;
}
}

static bool collections_have_same_properties(
const LoanableCollection& data_values,
const SampleInfoSeq& sample_infos)
Expand Down Expand Up @@ -126,11 +92,7 @@ DataReaderImpl::DataReaderImpl(
, topic_(topic)
, qos_(&qos == &DATAREADER_QOS_DEFAULT ? subscriber_->get_default_datareader_qos() : qos)
#pragma warning (disable : 4355 )
, history_(topic_attributes(),
type_.get(),
qos_.get_readerqos(subscriber_->get_qos()),
type_->m_typeSize + 3, /* Possible alignment */
qos_.endpoint().history_memory_policy)
, history_(type, *topic, qos_)
, listener_(listener)
, reader_listener_(this)
, deadline_duration_us_(qos_.deadline().period.to_ns() * 1e-3)
Expand Down Expand Up @@ -703,10 +665,8 @@ ReturnCode_t DataReaderImpl::get_first_untaken_info(
return ReturnCode_t::RETCODE_NOT_ENABLED;
}

SampleInfo_t rtps_info;
if (history_.get_first_untaken_info(&rtps_info))
if (history_.get_first_untaken_info(*info))
{
sample_info_to_dds(rtps_info, info);
return ReturnCode_t::RETCODE_OK;
}
return ReturnCode_t::RETCODE_NO_DATA;
Expand Down Expand Up @@ -881,10 +841,10 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos(
bool DataReaderImpl::on_new_cache_change_added(
const CacheChange_t* const change)
{
std::lock_guard<RecursiveTimedMutex> guard(reader_->getMutex());

if (qos_.deadline().period != c_TimeInfinite)
{
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());

if (!history_.set_next_deadline(
change->instanceHandle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
Expand All @@ -902,6 +862,7 @@ bool DataReaderImpl::on_new_cache_change_added(
}

CacheChange_t* new_change = const_cast<CacheChange_t*>(change);
history_.update_instance_nts(new_change);

if (qos_.lifespan().duration == c_TimeInfinite)
{
Expand Down Expand Up @@ -957,6 +918,11 @@ void DataReaderImpl::update_subscription_matched_status(
subscription_matched_status_.last_publication_handle = status.last_publication_handle;
}

if (count_change < 0)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
}

StatusMask notify_status = StatusMask::subscription_matched();
DataReaderListener* listener = get_listener_for(notify_status);
if (listener != nullptr)
Expand Down Expand Up @@ -1224,6 +1190,11 @@ RequestedIncompatibleQosStatus& DataReaderImpl::update_requested_incompatible_qo
LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
const fastrtps::LivelinessChangedStatus& status)
{
if (0 < status.not_alive_count_change)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
}

liveliness_changed_status_.alive_count = status.alive_count;
liveliness_changed_status_.not_alive_count = status.not_alive_count;
liveliness_changed_status_.alive_count_change += status.alive_count_change;
Expand Down
5 changes: 3 additions & 2 deletions src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include <fastdds/rtps/reader/ReaderListener.h>

#include <fastrtps/attributes/TopicAttributes.h>
#include <fastrtps/subscriber/SubscriberHistory.h>
#include <fastrtps/qos/LivelinessChangedStatus.h>
#include <fastrtps/types/TypesBase.h>

Expand All @@ -46,6 +45,8 @@
#include <fastdds/subscriber/SubscriberImpl.hpp>
#include <rtps/history/ITopicPayloadPool.h>

#include <fastdds/subscriber/history/DataReaderHistory.hpp>

using eprosima::fastrtps::types::ReturnCode_t;

namespace eprosima {
Expand Down Expand Up @@ -331,7 +332,7 @@ class DataReaderImpl
DataReaderQos qos_;

//!History
fastrtps::SubscriberHistory history_;
detail::DataReaderHistory history_;

//!Listener
DataReaderListener* listener_ = nullptr;
Expand Down
Loading