Skip to content

Commit

Permalink
Adding implementation for instance_state and view_state (#2298)
Browse files Browse the repository at this point in the history
* Refs 12400. Added DataReaderCacheChange.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Added DataReaderInstance.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. DataReaderHistory using new types.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Removing unnecessary method from SubscriberHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. ReadTakeCommand receives full instance information.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. ReadTakeCommand checks for instance states.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. ReadTakeCommand fills sample info from instance data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Added insert method to ResourceLimitedVector.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. DataReaderInstance uses ResourceLimitedVector.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. get_first_untaken_info takes sample info from instance data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Discard received change when older than oldest.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Fixing KEEP_ALL with keys.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Refactor to always use instances.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Basic structure for update instance state.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Adding alive_writers and current_owner to DataReaderInstance.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Implementing writer_alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Implementing writer_dispose.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Implementing writer_unregister.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Setting NOT_NEW on returned instances.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Correct return code when returning samples with no data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Set view_state to NEW when changing instance_state to ALIVE.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Moving generation counts into CacheChange_t.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Assigning generation counts after processing instance state.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Update instance_state when writer becomes not alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Clear alive_writers when changing generation.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Add writer_unmatched to ReaderHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. NOT_ALIVE_UNREGISTERED should not return valid data.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Set autodispose_unregistered_instances to false on test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Remove instance when it becomes empty and is not alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Refactor into writer_not_alive.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Keeping samples from unmatched writers.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Avoid keeping non-notified samples.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12400. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Fixing compilation warnings on windows.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12758. Fixing assertion on WriterProxy logic. If we only notify fragmented DATA reception on completion we should only notify removal of fully assembled samples.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12758 Fixing DataReaderHistory test that checks DataWriter disposal behaviour.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 12758. Use move semantics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Use ResourceLimitedVector for writers.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Apply pre-allocation policies.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. PubSubReader. Account for writer_guid on last_seq checks.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Added can_change_be_added_nts.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Removed unused method.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Always use completed changes for key computation.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Fixed ResourceLimitedVector::insert.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Avoid dynamic allocation inside remove_changes_with_pred.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Optimization on DataReaderHistory::remove_change_nts.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Method writer_unmatched documented and improved.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Do not complete changes for non-keyed topics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Do not remove incomplete changes for keyed topics.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Different removal policy on ReaderHistory and DataReaderHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Fix unused parameter warning.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Removed unused header.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Doxydoc improvements.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 12758. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Miguel Barro <miguelbarro@eprosima.com>
  • Loading branch information
MiguelCompany and Miguel Barro committed Dec 20, 2021
1 parent 3d4b379 commit d03d6ea
Show file tree
Hide file tree
Showing 19 changed files with 889 additions and 474 deletions.
2 changes: 1 addition & 1 deletion include/fastdds/dds/subscriber/SampleInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct SampleInfo

//! the generation difference between the time the sample was received, and the time the most recent sample was received.
//! The most recent sample used for the calculation may or may not be in the returned collection
int32_t absoulte_generation_rank;
int32_t absolute_generation_rank;

//! time provided by the DataWriter when the sample was written
fastrtps::rtps::Time_t source_timestamp;
Expand Down
4 changes: 4 additions & 0 deletions include/fastdds/rtps/common/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ struct CacheChangeReaderInfo_t
{
//!Reception TimeStamp (only used in Readers)
Time_t receptionTimestamp;
//! Disposed generation of the instance when this entry was added to it
int32_t disposed_generation_count;
//! No-writers generation of the instance when this entry was added to it
int32_t no_writers_generation_count;
};

/**
Expand Down
28 changes: 28 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,19 @@ class ReaderHistory : public History
CacheChange_t** min_change,
const GUID_t& writerGuid);

/**
* Called when a writer is unmatched from the reader holding this history.
*
* This method will remove all the changes on the history that came from the writer being unmatched and which have
* not yet been notified to the user.
*
* @param writer_guid GUID of the writer being unmatched.
* @param last_notified_seq Last sequence number from the specified writer that was notified to the user.
*/
RTPS_DllAPI virtual void writer_unmatched(
const GUID_t& writer_guid,
const SequenceNumber_t& last_notified_seq);

protected:

RTPS_DllAPI bool do_reserve_cache(
Expand All @@ -161,6 +174,21 @@ class ReaderHistory : public History
RTPS_DllAPI void do_release_cache(
CacheChange_t* ch) override;

template<typename Pred>
inline void remove_changes_with_pred(
Pred pred)
{
assert(nullptr != mp_reader);
assert(nullptr != mp_mutex);

std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
std::vector<CacheChange_t*>::iterator new_end = std::remove_if(m_changes.begin(), m_changes.end(), pred);
while (new_end != m_changes.end())
{
new_end = remove_change_nts(new_end);
}
}

//!Pointer to the reader
RTPSReader* mp_reader;

Expand Down
21 changes: 0 additions & 21 deletions include/fastrtps/subscriber/SubscriberHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ class SubscriberHistory : public rtps::ReaderHistory
{
public:

using instance_info = std::pair<rtps::InstanceHandle_t, std::vector<rtps::CacheChange_t*>*>;

/**
* Constructor. Requires information about the subscriber.
* @param topic_att TopicAttributes.
Expand Down Expand Up @@ -179,25 +177,6 @@ class SubscriberHistory : public rtps::ReaderHistory
rtps::InstanceHandle_t& handle,
std::chrono::steady_clock::time_point& next_deadline_us);

/**
* @brief Get the list of changes corresponding to an instance handle.
* @param handle The handle to the instance.
* @param exact Indicates if the handle should match exactly (true) or if the first instance greater than the
* input handle should be returned.
* @return A pair where:
* - @c first is a boolean indicating if an instance was found
* - @c second is a pair where:
* - @c first is the handle of the returned instance
* - @c second is a pointer to a std::vector<rtps::CacheChange_t*> with the list of changes for the
* returned instance
*
* @remarks When used on a NO_KEY topic, an instance will only be returned when called with
* `handle = HANDLE_NIL` and `exact = false`.
*/
std::pair<bool, instance_info> lookup_instance(
const rtps::InstanceHandle_t& handle,
bool exact);

private:

using t_m_Inst_Caches = std::map<rtps::InstanceHandle_t, KeyedChanges>;
Expand Down
41 changes: 41 additions & 0 deletions include/fastrtps/utils/collections/ResourceLimitedVector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,47 @@ class ResourceLimitedVector
return *this;
}

/**
* Insert value before pos.
*
* @param pos iterator before which the content will be inserted. pos may be the end() iterator.
* @param value element value to insert.
*
* @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
*/
iterator insert(
const_iterator pos,
const value_type& value)
{
auto dist = std::distance(collection_.cbegin(), pos);
if (!ensure_capacity())
{
return end();
}

return collection_.insert(collection_.cbegin() + dist, value);
}

/**
* Insert value before pos.
*
* @param pos iterator before which the content will be inserted. pos may be the end() iterator.
* @param value element value to insert.
*
* @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
*/
iterator insert(
const_iterator pos,
value_type&& value)
{
if (!ensure_capacity())
{
return end();
}

return collection_.insert(pos, std::move(value));
}

/**
* Add element at the end.
*
Expand Down
15 changes: 13 additions & 2 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -841,10 +841,10 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos(
bool DataReaderImpl::on_new_cache_change_added(
const CacheChange_t* const change)
{
std::lock_guard<RecursiveTimedMutex> guard(reader_->getMutex());

if (qos_.deadline().period != c_TimeInfinite)
{
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());

if (!history_.set_next_deadline(
change->instanceHandle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
Expand All @@ -862,6 +862,7 @@ bool DataReaderImpl::on_new_cache_change_added(
}

CacheChange_t* new_change = const_cast<CacheChange_t*>(change);
history_.update_instance_nts(new_change);

if (qos_.lifespan().duration == c_TimeInfinite)
{
Expand Down Expand Up @@ -917,6 +918,11 @@ void DataReaderImpl::update_subscription_matched_status(
subscription_matched_status_.last_publication_handle = status.last_publication_handle;
}

if (count_change < 0)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
}

StatusMask notify_status = StatusMask::subscription_matched();
DataReaderListener* listener = get_listener_for(notify_status);
if (listener != nullptr)
Expand Down Expand Up @@ -1184,6 +1190,11 @@ RequestedIncompatibleQosStatus& DataReaderImpl::update_requested_incompatible_qo
LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
const fastrtps::LivelinessChangedStatus& status)
{
if (0 < status.not_alive_count_change)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
}

liveliness_changed_status_.alive_count = status.alive_count;
liveliness_changed_status_.not_alive_count = status.not_alive_count;
liveliness_changed_status_.alive_count_change += status.alive_count_change;
Expand Down
105 changes: 55 additions & 50 deletions src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct ReadTakeCommand
SampleInfoSeq& sample_infos,
int32_t max_samples,
const StateFilter& states,
history_type::instance_info instance,
const history_type::instance_info& instance,
bool single_instance = false)
: type_(reader.type_)
, loan_manager_(reader.loan_manager_)
Expand Down Expand Up @@ -108,8 +108,8 @@ struct ReadTakeCommand
// Traverse changes on current instance
bool ret_val = false;
LoanableCollection::size_type first_slot = current_slot_;
auto it = instance_.second->begin();
while (!finished_ && it != instance_.second->end())
auto it = instance_.second->cache_changes.begin();
while (!finished_ && it != instance_.second->cache_changes.end())
{
CacheChange_t* change = *it;
SampleStateKind check;
Expand Down Expand Up @@ -142,10 +142,9 @@ struct ReadTakeCommand
// in the future also
if (!is_future_change)
{

// Add sample and info to collections
ReturnCode_t previous_return_value = return_value_;
bool added = add_sample(change, remove_change);
bool added = add_sample(*it, remove_change);
reader_->end_sample_access_nts(change, wp, added);

// Check if the payload is dirty
Expand Down Expand Up @@ -181,6 +180,7 @@ struct ReadTakeCommand

if (current_slot_ > first_slot)
{
instance_.second->view_state = ViewStateKind::NOT_NEW_VIEW_STATE;
ret_val = true;

// complete sample infos
Expand Down Expand Up @@ -208,6 +208,41 @@ struct ReadTakeCommand
return return_value_;
}

static void generate_info(
SampleInfo& info,
const DataReaderInstance& instance,
const DataReaderCacheChange& item)
{
info.sample_state = item->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE;
info.instance_state = instance.instance_state;
info.view_state = instance.view_state;
info.disposed_generation_count = item->reader_info.disposed_generation_count;
info.no_writers_generation_count = item->reader_info.no_writers_generation_count;
info.sample_rank = 0;
info.generation_rank = 0;
info.absolute_generation_rank = 0;
info.source_timestamp = item->sourceTimestamp;
info.reception_timestamp = item->reader_info.receptionTimestamp;
info.instance_handle = item->instanceHandle;
info.publication_handle = InstanceHandle_t(item->writerGUID);
info.sample_identity.writer_guid(item->writerGUID);
info.sample_identity.sequence_number(item->sequenceNumber);
info.related_sample_identity = item->write_params.sample_identity();
info.valid_data = true;

switch (item->kind)
{
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED:
case eprosima::fastrtps::rtps::NOT_ALIVE_UNREGISTERED:
info.valid_data = false;
break;
case eprosima::fastrtps::rtps::ALIVE:
default:
break;
}
}

private:

const TypeSupport& type_;
Expand Down Expand Up @@ -243,14 +278,15 @@ struct ReadTakeCommand

bool is_current_instance_valid()
{
// We are not implementing instance_state or view_state yet, so all instances will be considered to have
// a valid state. In the future this should check instance_state against states_.instance_states and
// view_state against states_.view_states
return true;
// Check instance_state against states_.instance_states and view_state against states_.view_states
auto instance_state = instance_.second->instance_state;
auto view_state = instance_.second->view_state;
return (0 != (states_.instance_states & instance_state)) && (0 != (states_.view_states & view_state));
}

bool next_instance()
{
history_.check_and_remove_instance(instance_);
if (single_instance_)
{
finished_ = true;
Expand All @@ -270,7 +306,7 @@ struct ReadTakeCommand
}

bool add_sample(
CacheChange_t* change,
const DataReaderCacheChange& item,
bool& deserialization_error)
{
bool ret_val = false;
Expand All @@ -284,22 +320,21 @@ struct ReadTakeCommand
sample_infos_.length(new_len);

// Add information
generate_info(change);
generate_info(item);
if (sample_infos_[current_slot_].valid_data)
{
if (!deserialize_sample(change))
if (!deserialize_sample(item))
{
// Decrement length of collections
data_values_.length(current_slot_);
sample_infos_.length(current_slot_);
deserialization_error = true;
return false;
}

// Mark that some data is available
return_value_ = ReturnCode_t::RETCODE_OK;
}

// Mark that some data is available
return_value_ = ReturnCode_t::RETCODE_OK;
++current_slot_;
--remaining_samples_;
ret_val = true;
Expand Down Expand Up @@ -330,48 +365,18 @@ struct ReadTakeCommand
}

void generate_info(
CacheChange_t* change)
const DataReaderCacheChange& item)
{
// Loan when necessary
if (!sample_infos_.has_ownership())
{
SampleInfo* item = info_pool_.get_item();
assert(item != nullptr);
const_cast<void**>(sample_infos_.buffer())[current_slot_] = item;
SampleInfo* pool_item = info_pool_.get_item();
assert(pool_item != nullptr);
const_cast<void**>(sample_infos_.buffer())[current_slot_] = pool_item;
}

SampleInfo& info = sample_infos_[current_slot_];
info.sample_state = change->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE;
info.view_state = NOT_NEW_VIEW_STATE;
info.disposed_generation_count = 0;
info.no_writers_generation_count = 1;
info.sample_rank = 0;
info.generation_rank = 0;
info.absoulte_generation_rank = 0;
info.source_timestamp = change->sourceTimestamp;
info.reception_timestamp = change->reader_info.receptionTimestamp;
info.instance_handle = handle_;
info.publication_handle = InstanceHandle_t(change->writerGUID);
info.sample_identity.writer_guid(change->writerGUID);
info.sample_identity.sequence_number(change->sequenceNumber);
info.related_sample_identity = change->write_params.sample_identity();
info.valid_data = true;

switch (change->kind)
{
case eprosima::fastrtps::rtps::ALIVE:
info.instance_state = ALIVE_INSTANCE_STATE;
break;
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED:
info.instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE;
info.valid_data = false;
break;
default:
//TODO [ILG] change this if the other kinds ever get implemented
info.instance_state = ALIVE_INSTANCE_STATE;
break;
}
generate_info(info, *instance_.second, item);
}

bool check_datasharing_validity(
Expand Down
Loading

0 comments on commit d03d6ea

Please sign in to comment.