Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding implementation for instance_state and view_state [12758] #2298

Merged
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
9d2f2d6
Refs 12400. Added DataReaderCacheChange.
MiguelCompany Aug 31, 2021
52b6c9f
Refs 12400. Added DataReaderInstance.
MiguelCompany Sep 1, 2021
7280d7a
Refs 12400. DataReaderHistory using new types.
MiguelCompany Sep 3, 2021
b2b1fae
Refs 12400. Removing unnecessary method from SubscriberHistory.
MiguelCompany Sep 7, 2021
4ee4f02
Refs 12400. ReadTakeCommand receives full instance information.
MiguelCompany Sep 7, 2021
e2ef28c
Refs 12400. ReadTakeCommand checks for instance states.
MiguelCompany Sep 7, 2021
da93a9d
Refs 12400. ReadTakeCommand fills sample info from instance data.
MiguelCompany Sep 7, 2021
0c1f59f
Refs 12400. Added insert method to ResourceLimitedVector.
MiguelCompany Sep 7, 2021
817dfd4
Refs 12400. DataReaderInstance uses ResourceLimitedVector.
MiguelCompany Sep 7, 2021
860b932
Refs 12400. get_first_untaken_info takes sample info from instance data.
MiguelCompany Sep 8, 2021
f868b4d
Refs 12400. Discard received change when older than oldest.
MiguelCompany Sep 8, 2021
bb5e29b
Refs 12400. Fixing KEEP_ALL with keys.
MiguelCompany Sep 8, 2021
0b2b047
Refs 12400. Refactor to always use instances.
MiguelCompany Sep 8, 2021
9c69a9a
Refs 12400. Basic structure for update instance state.
MiguelCompany Oct 26, 2021
b555617
Refs 12400. Adding alive_writers and current_owner to DataReaderInsta…
MiguelCompany Oct 26, 2021
0ae7cd8
Refs 12400. Implementing writer_alive.
MiguelCompany Oct 26, 2021
1284cbc
Refs 12400. Implementing writer_dispose.
MiguelCompany Oct 26, 2021
d863991
Refs 12400. Implementing writer_unregister.
MiguelCompany Oct 26, 2021
7f6f641
Refs 12400. Setting NOT_NEW on returned instances.
MiguelCompany Oct 26, 2021
664aca1
Refs 12400. Correct return code when returning samples with no data.
MiguelCompany Oct 27, 2021
570b43f
Refs 12400. Set view_state to NEW when changing instance_state to ALIVE.
MiguelCompany Oct 27, 2021
3020516
Refs 12400. Moving generation counts into CacheChange_t.
MiguelCompany Oct 27, 2021
48f7a44
Refs 12400. Assigning generation counts after processing instance state.
MiguelCompany Oct 27, 2021
3ee50bd
Refs 12400. Update instance_state when writer becomes not alive.
MiguelCompany Oct 27, 2021
50b8868
Refs 12400. Clear alive_writers when changing generation.
MiguelCompany Oct 27, 2021
2f61a2e
Refs 12400. Add writer_unmatched to ReaderHistory.
MiguelCompany Oct 27, 2021
5e57866
Refs 12400. NOT_ALIVE_UNREGISTERED should not return valid data.
MiguelCompany Oct 28, 2021
d3d0e3d
Refs 12400. Set autodispose_unregistered_instances to false on test.
MiguelCompany Oct 28, 2021
5b4c855
Refs 12400. Remove instance when it becomes empty and is not alive.
MiguelCompany Oct 28, 2021
2a9f55e
Refs 12400. Refactor into writer_not_alive.
MiguelCompany Oct 29, 2021
38ca299
Refs 12400. Keeping samples from unmatched writers.
MiguelCompany Oct 29, 2021
961408c
Refs 12400. Avoid keeping non-notified samples.
MiguelCompany Oct 29, 2021
14fcb19
Refs 12400. Linters.
MiguelCompany Oct 29, 2021
db2013c
Refs 12758. Fixing compilation warnings on windows.
Nov 23, 2021
af7667f
Refs 12758. Fixing assertion on WriterProxy logic. If we only notify …
Nov 29, 2021
9e6e9bb
Refs 12758 Fixing DataReaderHistory test that checks DataWriter dispo…
Nov 30, 2021
30adad9
Refs 12758. Use move semantics.
MiguelCompany Dec 7, 2021
2e9595e
Refs 12758. Use ResourceLimitedVector for writers.
MiguelCompany Dec 7, 2021
bd0d144
Refs 12758. Apply pre-allocation policies.
MiguelCompany Dec 7, 2021
f5f8fb5
Refs 12758. Uncrustify.
MiguelCompany Dec 7, 2021
737c241
Refs 12758. PubSubReader. Account for writer_guid on last_seq checks.
MiguelCompany Dec 7, 2021
c4d11dd
Refs 12758. Added can_change_be_added_nts.
MiguelCompany Dec 13, 2021
fb10bcb
Refs 12758. Removed unused method.
MiguelCompany Dec 13, 2021
0bd9fcb
Refs 12758. Always use completed changes for key computation.
MiguelCompany Dec 13, 2021
6ecf193
Refs 12758. Fixed ResourceLimitedVector::insert.
MiguelCompany Dec 13, 2021
134f9b1
Refs 12758. Uncrustify.
MiguelCompany Dec 13, 2021
1b42b23
Refs 12758. Avoid dynamic allocation inside remove_changes_with_pred.
MiguelCompany Dec 13, 2021
574622b
Refs 12758. Optimization on DataReaderHistory::remove_change_nts.
MiguelCompany Dec 13, 2021
a7f2ae1
Refs 12758. Method writer_unmatched documented and improved.
MiguelCompany Dec 13, 2021
6149ee3
Refs 12758. Do not complete changes for non-keyed topics.
MiguelCompany Dec 14, 2021
ec5336c
Refs 12758. Do not remove incomplete changes for keyed topics.
MiguelCompany Dec 15, 2021
64e52e5
Refs 12758. Different removal policy on ReaderHistory and DataReaderH…
MiguelCompany Dec 16, 2021
f956e80
Refs 12758. Fix unused parameter warning.
MiguelCompany Dec 16, 2021
134afc6
Refs 12758. Removed unused header.
MiguelCompany Dec 20, 2021
ed04235
Refs 12758. Doxydoc improvements.
MiguelCompany Dec 20, 2021
11fee0a
Refs 12758. Linters.
MiguelCompany Dec 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/fastdds/dds/subscriber/SampleInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct SampleInfo

//! the generation difference between the time the sample was received, and the time the most recent sample was received.
//! The most recent sample used for the calculation may or may not be in the returned collection
int32_t absoulte_generation_rank;
int32_t absolute_generation_rank;

//! time provided by the DataWriter when the sample was written
fastrtps::rtps::Time_t source_timestamp;
Expand Down
4 changes: 4 additions & 0 deletions include/fastdds/rtps/common/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ struct CacheChangeReaderInfo_t
{
//!Reception TimeStamp (only used in Readers)
Time_t receptionTimestamp;
//! Disposed generation of the instance when this entry was added to it
int32_t disposed_generation_count;
//! No-writers generation of the instance when this entry was added to it
int32_t no_writers_generation_count;
};

/**
Expand Down
28 changes: 28 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,19 @@ class ReaderHistory : public History
CacheChange_t** min_change,
const GUID_t& writerGuid);

/**
* Called when a writer is unmatched from the reader holding this history.
*
* This method will remove all the changes on the history that came from the writer being unmatched and which have
* not yet been notified to the user.
*
* @param writer_guid GUID of the writer being unmatched.
* @param last_notified_seq Last sequence number from the specified writer that was notified to the user.
*/
RTPS_DllAPI virtual void writer_unmatched(
const GUID_t& writer_guid,
const SequenceNumber_t& last_notified_seq);

protected:

RTPS_DllAPI bool do_reserve_cache(
Expand All @@ -161,6 +174,21 @@ class ReaderHistory : public History
RTPS_DllAPI void do_release_cache(
CacheChange_t* ch) override;

template<typename Pred>
inline void remove_changes_with_pred(
Pred pred)
{
assert(nullptr != mp_reader);
assert(nullptr != mp_mutex);

std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
std::vector<CacheChange_t*>::iterator new_end = std::remove_if(m_changes.begin(), m_changes.end(), pred);
while (new_end != m_changes.end())
{
new_end = remove_change_nts(new_end);
}
}

//!Pointer to the reader
RTPSReader* mp_reader;

Expand Down
21 changes: 0 additions & 21 deletions include/fastrtps/subscriber/SubscriberHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ class SubscriberHistory : public rtps::ReaderHistory
{
public:

using instance_info = std::pair<rtps::InstanceHandle_t, std::vector<rtps::CacheChange_t*>*>;

/**
* Constructor. Requires information about the subscriber.
* @param topic_att TopicAttributes.
Expand Down Expand Up @@ -179,25 +177,6 @@ class SubscriberHistory : public rtps::ReaderHistory
rtps::InstanceHandle_t& handle,
std::chrono::steady_clock::time_point& next_deadline_us);

/**
* @brief Get the list of changes corresponding to an instance handle.
* @param handle The handle to the instance.
* @param exact Indicates if the handle should match exactly (true) or if the first instance greater than the
* input handle should be returned.
* @return A pair where:
* - @c first is a boolean indicating if an instance was found
* - @c second is a pair where:
* - @c first is the handle of the returned instance
* - @c second is a pointer to a std::vector<rtps::CacheChange_t*> with the list of changes for the
* returned instance
*
* @remarks When used on a NO_KEY topic, an instance will only be returned when called with
* `handle = HANDLE_NIL` and `exact = false`.
*/
std::pair<bool, instance_info> lookup_instance(
const rtps::InstanceHandle_t& handle,
bool exact);

private:

using t_m_Inst_Caches = std::map<rtps::InstanceHandle_t, KeyedChanges>;
Expand Down
41 changes: 41 additions & 0 deletions include/fastrtps/utils/collections/ResourceLimitedVector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,47 @@ class ResourceLimitedVector
return *this;
}

/**
* Insert value before pos.
*
* @param pos iterator before which the content will be inserted. pos may be the end() iterator.
* @param value element value to insert.
*
* @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
*/
iterator insert(
const_iterator pos,
const value_type& value)
{
auto dist = std::distance(collection_.cbegin(), pos);
if (!ensure_capacity())
{
return end();
}

return collection_.insert(collection_.cbegin() + dist, value);
}

/**
* Insert value before pos.
*
* @param pos iterator before which the content will be inserted. pos may be the end() iterator.
* @param value element value to insert.
*
* @return Iterator pointing to the inserted value. end() if insertion couldn't be done due to collection limits.
*/
iterator insert(
const_iterator pos,
value_type&& value)
{
if (!ensure_capacity())
{
return end();
}

return collection_.insert(pos, std::move(value));
}

/**
* Add element at the end.
*
Expand Down
15 changes: 13 additions & 2 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -841,10 +841,10 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos(
bool DataReaderImpl::on_new_cache_change_added(
const CacheChange_t* const change)
{
std::lock_guard<RecursiveTimedMutex> guard(reader_->getMutex());

if (qos_.deadline().period != c_TimeInfinite)
{
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());

if (!history_.set_next_deadline(
change->instanceHandle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
Expand All @@ -862,6 +862,7 @@ bool DataReaderImpl::on_new_cache_change_added(
}

CacheChange_t* new_change = const_cast<CacheChange_t*>(change);
history_.update_instance_nts(new_change);

if (qos_.lifespan().duration == c_TimeInfinite)
{
Expand Down Expand Up @@ -917,6 +918,11 @@ void DataReaderImpl::update_subscription_matched_status(
subscription_matched_status_.last_publication_handle = status.last_publication_handle;
}

if (count_change < 0)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
}

StatusMask notify_status = StatusMask::subscription_matched();
DataReaderListener* listener = get_listener_for(notify_status);
if (listener != nullptr)
Expand Down Expand Up @@ -1184,6 +1190,11 @@ RequestedIncompatibleQosStatus& DataReaderImpl::update_requested_incompatible_qo
LivelinessChangedStatus& DataReaderImpl::update_liveliness_status(
const fastrtps::LivelinessChangedStatus& status)
{
if (0 < status.not_alive_count_change)
{
history_.writer_not_alive(iHandle2GUID(status.last_publication_handle));
}

liveliness_changed_status_.alive_count = status.alive_count;
liveliness_changed_status_.not_alive_count = status.not_alive_count;
liveliness_changed_status_.alive_count_change += status.alive_count_change;
Expand Down
105 changes: 55 additions & 50 deletions src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct ReadTakeCommand
SampleInfoSeq& sample_infos,
int32_t max_samples,
const StateFilter& states,
history_type::instance_info instance,
const history_type::instance_info& instance,
bool single_instance = false)
: type_(reader.type_)
, loan_manager_(reader.loan_manager_)
Expand Down Expand Up @@ -108,8 +108,8 @@ struct ReadTakeCommand
// Traverse changes on current instance
bool ret_val = false;
LoanableCollection::size_type first_slot = current_slot_;
auto it = instance_.second->begin();
while (!finished_ && it != instance_.second->end())
auto it = instance_.second->cache_changes.begin();
while (!finished_ && it != instance_.second->cache_changes.end())
{
CacheChange_t* change = *it;
SampleStateKind check;
Expand Down Expand Up @@ -142,10 +142,9 @@ struct ReadTakeCommand
// in the future also
if (!is_future_change)
{

// Add sample and info to collections
ReturnCode_t previous_return_value = return_value_;
bool added = add_sample(change, remove_change);
bool added = add_sample(*it, remove_change);
reader_->end_sample_access_nts(change, wp, added);

// Check if the payload is dirty
Expand Down Expand Up @@ -181,6 +180,7 @@ struct ReadTakeCommand

if (current_slot_ > first_slot)
{
instance_.second->view_state = ViewStateKind::NOT_NEW_VIEW_STATE;
ret_val = true;

// complete sample infos
Expand Down Expand Up @@ -208,6 +208,41 @@ struct ReadTakeCommand
return return_value_;
}

static void generate_info(
SampleInfo& info,
const DataReaderInstance& instance,
const DataReaderCacheChange& item)
{
info.sample_state = item->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE;
info.instance_state = instance.instance_state;
info.view_state = instance.view_state;
info.disposed_generation_count = item->reader_info.disposed_generation_count;
info.no_writers_generation_count = item->reader_info.no_writers_generation_count;
info.sample_rank = 0;
info.generation_rank = 0;
info.absolute_generation_rank = 0;
info.source_timestamp = item->sourceTimestamp;
info.reception_timestamp = item->reader_info.receptionTimestamp;
info.instance_handle = item->instanceHandle;
info.publication_handle = InstanceHandle_t(item->writerGUID);
info.sample_identity.writer_guid(item->writerGUID);
info.sample_identity.sequence_number(item->sequenceNumber);
info.related_sample_identity = item->write_params.sample_identity();
info.valid_data = true;

switch (item->kind)
{
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED:
case eprosima::fastrtps::rtps::NOT_ALIVE_UNREGISTERED:
info.valid_data = false;
break;
case eprosima::fastrtps::rtps::ALIVE:
default:
break;
}
}

private:

const TypeSupport& type_;
Expand Down Expand Up @@ -243,14 +278,15 @@ struct ReadTakeCommand

bool is_current_instance_valid()
{
// We are not implementing instance_state or view_state yet, so all instances will be considered to have
// a valid state. In the future this should check instance_state against states_.instance_states and
// view_state against states_.view_states
return true;
// Check instance_state against states_.instance_states and view_state against states_.view_states
auto instance_state = instance_.second->instance_state;
auto view_state = instance_.second->view_state;
return (0 != (states_.instance_states & instance_state)) && (0 != (states_.view_states & view_state));
}

bool next_instance()
{
history_.check_and_remove_instance(instance_);
if (single_instance_)
{
finished_ = true;
Expand All @@ -270,7 +306,7 @@ struct ReadTakeCommand
}

bool add_sample(
CacheChange_t* change,
const DataReaderCacheChange& item,
bool& deserialization_error)
{
bool ret_val = false;
Expand All @@ -284,22 +320,21 @@ struct ReadTakeCommand
sample_infos_.length(new_len);

// Add information
generate_info(change);
generate_info(item);
if (sample_infos_[current_slot_].valid_data)
{
if (!deserialize_sample(change))
if (!deserialize_sample(item))
{
// Decrement length of collections
data_values_.length(current_slot_);
sample_infos_.length(current_slot_);
deserialization_error = true;
return false;
}

// Mark that some data is available
return_value_ = ReturnCode_t::RETCODE_OK;
}

// Mark that some data is available
return_value_ = ReturnCode_t::RETCODE_OK;
++current_slot_;
--remaining_samples_;
ret_val = true;
Expand Down Expand Up @@ -330,48 +365,18 @@ struct ReadTakeCommand
}

void generate_info(
CacheChange_t* change)
const DataReaderCacheChange& item)
{
// Loan when necessary
if (!sample_infos_.has_ownership())
{
SampleInfo* item = info_pool_.get_item();
assert(item != nullptr);
const_cast<void**>(sample_infos_.buffer())[current_slot_] = item;
SampleInfo* pool_item = info_pool_.get_item();
assert(pool_item != nullptr);
const_cast<void**>(sample_infos_.buffer())[current_slot_] = pool_item;
}

SampleInfo& info = sample_infos_[current_slot_];
info.sample_state = change->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE;
info.view_state = NOT_NEW_VIEW_STATE;
info.disposed_generation_count = 0;
info.no_writers_generation_count = 1;
info.sample_rank = 0;
info.generation_rank = 0;
info.absoulte_generation_rank = 0;
info.source_timestamp = change->sourceTimestamp;
info.reception_timestamp = change->reader_info.receptionTimestamp;
info.instance_handle = handle_;
info.publication_handle = InstanceHandle_t(change->writerGUID);
info.sample_identity.writer_guid(change->writerGUID);
info.sample_identity.sequence_number(change->sequenceNumber);
info.related_sample_identity = change->write_params.sample_identity();
info.valid_data = true;

switch (change->kind)
{
case eprosima::fastrtps::rtps::ALIVE:
info.instance_state = ALIVE_INSTANCE_STATE;
break;
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED:
info.instance_state = NOT_ALIVE_DISPOSED_INSTANCE_STATE;
info.valid_data = false;
break;
default:
//TODO [ILG] change this if the other kinds ever get implemented
info.instance_state = ALIVE_INSTANCE_STATE;
break;
}
generate_info(info, *instance_.second, item);
}

bool check_datasharing_validity(
Expand Down
Loading