Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incosistences between Pub/Sub history and RTPS history [12620] #2239

Merged
merged 7 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fastdds/rtps/history/History.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class History
RTPS_DllAPI virtual bool matches_change(
const CacheChange_t* ch_inner,
CacheChange_t* ch_outer);

/**
* Remove a specific change from the history.
* @param removal iterator to the CacheChange_t to remove.
Expand Down
13 changes: 13 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@ class ReaderHistory : public History
CacheChange_t* change,
size_t);

/**
* Called when a fragmented change is received completely by the Subscriber. Will find its instance and store it.
* @pre Change should be already present in the history.
* @param[in] change The received change
* @return
*/
RTPS_DllAPI bool virtual completed_change(
rtps::CacheChange_t* change)
{
(void)change;
return true;
}

/**
* Add a CacheChange_t to the ReaderHistory.
* @param a_change Pointer to the CacheChange to add.
Expand Down
22 changes: 20 additions & 2 deletions include/fastrtps/subscriber/SubscriberHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ class SubscriberHistory : public rtps::ReaderHistory
rtps::CacheChange_t* change,
size_t unknown_missing_changes_up_to) override;

/**
* Called when a fragmented change is received completely by the Subscriber. Will find its instance and store it.
* @pre Change should be already present in the history.
* @param[in] change The received change
* @return
*/
bool completed_change(
rtps::CacheChange_t* change);

/** @name Read or take data methods.
* Methods to read or take data from the History.
* @param data Pointer to the object where you want to read or take the information.
Expand Down Expand Up @@ -194,15 +203,18 @@ class SubscriberHistory : public rtps::ReaderHistory
/// Function processing a received change
std::function<bool(rtps::CacheChange_t*, size_t)> receive_fn_;

/// Function processing a completed fragmented change
std::function<bool(rtps::CacheChange_t*)> complete_fn_;

/**
* @brief Method that finds a key in m_keyedChanges or tries to add it if not found
* @param a_change The change to get the key from
* @param map_it A map iterator to the given key
* @param[out] map_it A map iterator to the given key
* @return True if it was found or could be added to the map
*/
bool find_key(
rtps::CacheChange_t* a_change,
t_m_Inst_Caches::iterator* map_it);
t_m_Inst_Caches::iterator& map_it);

/**
* @brief Method that finds a key in m_keyedChanges or tries to add it if not found
Expand Down Expand Up @@ -237,6 +249,12 @@ class SubscriberHistory : public rtps::ReaderHistory
bool received_change_keep_last_with_key(
rtps::CacheChange_t* change,
size_t unknown_missing_changes_up_to);

bool completed_change_keep_all_with_key(
rtps::CacheChange_t* change);

bool completed_change_keep_last_with_key(
rtps::CacheChange_t* change);
///@}

bool add_received_change(
Expand Down
221 changes: 191 additions & 30 deletions src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ SubscriberHistory::SubscriberHistory(
receive_fn_ = topic_att.historyQos.kind == KEEP_ALL_HISTORY_QOS ?
std::bind(&SubscriberHistory::received_change_keep_all_with_key, this, _1, _2) :
std::bind(&SubscriberHistory::received_change_keep_last_with_key, this, _1, _2);
complete_fn_ = topic_att.historyQos.kind == KEEP_ALL_HISTORY_QOS ?
std::bind(&SubscriberHistory::completed_change_keep_all_with_key, this, _1) :
std::bind(&SubscriberHistory::completed_change_keep_last_with_key, this, _1);
}
}

Expand Down Expand Up @@ -192,49 +195,93 @@ bool SubscriberHistory::received_change_keep_all_with_key(
{
// TODO(Miguel C): Should we check unknown_missing_changes_up_to as it is done in received_change_keep_all_no_key?

bool ret_value = false;
t_m_Inst_Caches::iterator vit;
if (find_key_for_change(a_change, vit))
if (a_change->instanceHandle.isDefined() || a_change->is_fully_assembled()) // In this case we can obtain the the key.
{
std::vector<CacheChange_t*>& instance_changes = vit->second.cache_changes;
if (instance_changes.size() < static_cast<size_t>(resource_limited_qos_.max_samples_per_instance))
if (find_key_for_change(a_change, vit))
{
return add_received_change_with_key(a_change, vit->second.cache_changes);
std::vector<CacheChange_t*>& instance_changes = vit->second.cache_changes;
if (instance_changes.size() < static_cast<size_t>(resource_limited_qos_.max_samples_per_instance))
{
ret_value = add_received_change_with_key(a_change, vit->second.cache_changes);
}
else
{
logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance");
}
}
}
else // Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too.
{
if (!m_isHistoryFull)
{
ret_value = add_change(a_change);

logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance");
if (m_changes.size() == static_cast<size_t>(m_att.maximumReservedCaches))
{
m_isHistoryFull = true;
}
}
else
{
// Discarting the sample.
logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << topic_att_.getTopicDataType());
}
}

return false;
return ret_value;
}

bool SubscriberHistory::received_change_keep_last_with_key(
CacheChange_t* a_change,
size_t /* unknown_missing_changes_up_to */)
{
bool ret_value = false;
t_m_Inst_Caches::iterator vit;
if (find_key_for_change(a_change, vit))
if (a_change->instanceHandle.isDefined() || a_change->is_fully_assembled()) // In this case we can obtain the the key.
{
bool add = false;
std::vector<CacheChange_t*>& instance_changes = vit->second.cache_changes;
if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
if (find_key_for_change(a_change, vit))
{
add = true;
bool add = false;
std::vector<CacheChange_t*>& instance_changes = vit->second.cache_changes;
if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
{
add = true;
}
else
{
// Try to substitute the oldest sample.

// As the instance should be ordered following the presentation QoS, we can always remove the first one.
add = remove_change_sub(instance_changes.at(0));
}

if (add)
{
ret_value = add_received_change_with_key(a_change, instance_changes);
}
}
else
}
else // Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too.
{
if (!m_isHistoryFull)
{
// Try to substitute the oldest sample.
ret_value = add_change(a_change);

// As the instance should be ordered following the presentation QoS, we can always remove the first one.
add = remove_change_sub(instance_changes.at(0));
if (m_changes.size() == static_cast<size_t>(m_att.maximumReservedCaches))
{
m_isHistoryFull = true;
}
}

if (add)
else
{
return add_received_change_with_key(a_change, instance_changes);
// Discarting the sample.
logWarning(SUBSCRIBER, "Attempting to add Data to Full ReaderHistory: " << topic_att_.getTopicDataType());
}
}

return false;
return ret_value;
}

bool SubscriberHistory::add_received_change(
Expand Down Expand Up @@ -323,7 +370,7 @@ bool SubscriberHistory::find_key_for_change(
return false;
}

return find_key(a_change, &map_it);
return find_key(a_change, map_it);
}

bool SubscriberHistory::deserialize_change(
Expand Down Expand Up @@ -443,29 +490,27 @@ bool SubscriberHistory::get_first_untaken_info(

bool SubscriberHistory::find_key(
CacheChange_t* a_change,
t_m_Inst_Caches::iterator* vit_out)
t_m_Inst_Caches::iterator& vit_out)
{
t_m_Inst_Caches::iterator vit;
vit = keyed_changes_.find(a_change->instanceHandle);
if (vit != keyed_changes_.end())
vit_out = keyed_changes_.find(a_change->instanceHandle);
if (vit_out != keyed_changes_.end())
{
*vit_out = vit;
return true;
}

if (keyed_changes_.size() < static_cast<size_t>(resource_limited_qos_.max_instances))
{
*vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first;
vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first;
return true;
}
else
{
for (vit = keyed_changes_.begin(); vit != keyed_changes_.end(); ++vit)
for (t_m_Inst_Caches::iterator vit = keyed_changes_.begin(); vit != keyed_changes_.end(); ++vit)
{
if (vit->second.cache_changes.size() == 0)
{
keyed_changes_.erase(vit);
*vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first;
vit_out = keyed_changes_.insert(std::make_pair(a_change->instanceHandle, KeyedChanges())).first;
return true;
}
}
Expand All @@ -488,7 +533,7 @@ bool SubscriberHistory::remove_change_sub(
{
bool found = false;
t_m_Inst_Caches::iterator vit;
if (find_key(change, &vit))
if (find_key(change, vit))
{
for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit)
{
Expand Down Expand Up @@ -530,7 +575,7 @@ bool SubscriberHistory::remove_change_sub(
{
bool found = false;
t_m_Inst_Caches::iterator vit;
if (find_key(change, &vit))
if (find_key(change, vit))
{
for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit)
{
Expand Down Expand Up @@ -707,5 +752,121 @@ ReaderHistory::iterator SubscriberHistory::remove_change_nts(
return ReaderHistory::remove_change_nts(removal, release);
}

bool SubscriberHistory::completed_change(
rtps::CacheChange_t* change)
{
bool ret_value = true;

if (complete_fn_)
{
ret_value = complete_fn_(change);
}

return ret_value;
}

bool SubscriberHistory::completed_change_keep_all_with_key(
CacheChange_t* a_change)
{
bool ret_value = false;

if (!a_change->instanceHandle.isDefined())
{
t_m_Inst_Caches::iterator vit;
if (find_key_for_change(a_change, vit))
{
std::vector<CacheChange_t*>& instance_changes = vit->second.cache_changes;
if (instance_changes.size() < static_cast<size_t>(resource_limited_qos_.max_samples_per_instance))
{
//ADD TO KEY VECTOR
eprosima::utilities::collections::sorted_vector_insert(instance_changes, a_change,
[](const CacheChange_t* lhs, const CacheChange_t* rhs)
{
return lhs->sourceTimestamp < rhs->sourceTimestamp;
});
ret_value = true;

logInfo(SUBSCRIBER, mp_reader->getGuid().entityId
<< ": Change " << a_change->sequenceNumber << " added from: "
<< a_change->writerGUID << " with KEY: " << a_change->instanceHandle; );
}
else
{
logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance");

const_iterator chit = find_change_nts(a_change);
if (chit != changesEnd())
{
m_isHistoryFull = false;
remove_change_nts(chit);
}
else
{
logError(RTPS_WRITER_HISTORY, "Change should exists but didn't find it");
}
}
}
}

return ret_value;
}

bool SubscriberHistory::completed_change_keep_last_with_key(
CacheChange_t* a_change)
{
bool ret_value = false;

if (!a_change->instanceHandle.isDefined())
{
t_m_Inst_Caches::iterator vit;
if (find_key_for_change(a_change, vit))
{
bool add = false;
std::vector<CacheChange_t*>& instance_changes = vit->second.cache_changes;
if (instance_changes.size() < static_cast<size_t>(history_qos_.depth))
{
add = true;
}
else
{
// Try to substitute the oldest sample.

// As the instance should be ordered following the presentation QoS, we can always remove the first one.
add = remove_change_sub(instance_changes.at(0));
}

if (add)
{
//ADD TO KEY VECTOR
eprosima::utilities::collections::sorted_vector_insert(instance_changes, a_change,
[](const CacheChange_t* lhs, const CacheChange_t* rhs)
{
return lhs->sourceTimestamp < rhs->sourceTimestamp;
});
ret_value = true;

logInfo(SUBSCRIBER, mp_reader->getGuid().entityId
<< ": Change " << a_change->sequenceNumber << " added from: "
<< a_change->writerGUID << " with KEY: " << a_change->instanceHandle; );
}
else
{
const_iterator chit = find_change_nts(a_change);
if (chit != changesEnd())
{
m_isHistoryFull = false;
remove_change_nts(chit);
}
else
{
logError(RTPS_WRITER_HISTORY, "Change should exists but didn't find it");
}
}
}
}

return ret_value;
}

} // namespace fastrtps
} // namsepace eprosima
4 changes: 1 addition & 3 deletions src/cpp/rtps/history/ReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ bool ReaderHistory::remove_fragmented_changes_until(
if (item->is_fully_assembled() == false)
{
logInfo(RTPS_READER_HISTORY, "Removing change " << item->sequenceNumber);
mp_reader->change_removed_by_history(item);
mp_reader->releaseCache(item);
chit = m_changes.erase(chit);
chit = remove_change_nts(chit);
continue;
}
}
Expand Down
Loading