Skip to content

Commit

Permalink
#Refs #20257: Fix
Browse files Browse the repository at this point in the history
Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL committed Jan 24, 2024
1 parent a471216 commit 5059129
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 19 deletions.
35 changes: 16 additions & 19 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ DataReaderHistory::DataReaderHistory(
compute_key_for_change_fn_ =
[this](CacheChange_t* a_change)
{
if (a_change->instanceHandle.isDefined())
if (!a_change->is_fully_assembled())
{
return true;
return false;
}

if (!a_change->is_fully_assembled())
if (a_change->instanceHandle.isDefined())
{
return false;
return true;
}

if (type_ != nullptr)
Expand Down Expand Up @@ -736,27 +736,24 @@ bool DataReaderHistory::completed_change(
size_t unknown_missing_changes_up_to,
SampleRejectedStatusKind& rejection_reason)
{
bool ret_value = true;
rejection_reason = NOT_REJECTED;
bool ret_value = false;
rejection_reason = REJECTED_BY_INSTANCES_LIMIT;

if (!change->instanceHandle.isDefined())
if (compute_key_for_change_fn_(change))
{
ret_value = false;
if (compute_key_for_change_fn_(change))
InstanceCollection::iterator vit;
if (find_key(change->instanceHandle, vit))
{
InstanceCollection::iterator vit;
if (find_key(change->instanceHandle, vit))
{
ret_value = !change->instanceHandle.isDefined() ||
complete_fn_(change, *vit->second, unknown_missing_changes_up_to, rejection_reason);
}
else
{
rejection_reason = REJECTED_BY_INSTANCES_LIMIT;
}
ret_value = !change->instanceHandle.isDefined() ||
complete_fn_(change, *vit->second, unknown_missing_changes_up_to, rejection_reason);
}
}

if (ret_value)
{
rejection_reason = NOT_REJECTED;
}

return ret_value;
}

Expand Down
7 changes: 7 additions & 0 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ bool StatefulReader::processDataFragMsg(
{
work_change->copy_not_memcpy(change_to_add);
work_change->serializedPayload.length = sampleSize;
work_change->instanceHandle = c_InstanceHandle_Unknown;
work_change->setFragmentSize(change_to_add->getFragmentSize(), true);
change_created = work_change;
}
Expand All @@ -733,6 +734,12 @@ bool StatefulReader::processDataFragMsg(
{
work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum,
fragmentsInSubmessage);

// Set the instanceHandle only when fragment number 1 is received
if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1)
{
work_change->instanceHandle = change_to_add->instanceHandle;
}
}

// If this is the first time we have received fragments for this change, add it to history
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ bool StatelessReader::processDataFragMsg(
{
work_change->copy_not_memcpy(change_to_add);
work_change->serializedPayload.length = sampleSize;
work_change->instanceHandle = c_InstanceHandle_Unknown;
work_change->setFragmentSize(change_to_add->getFragmentSize(), true);
}
}
Expand All @@ -754,6 +755,12 @@ bool StatelessReader::processDataFragMsg(
change_completed = work_change;
work_change = nullptr;
}

// Set the instanceHandle only when fragment number 1 is received
if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1)
{
work_change->instanceHandle = change_to_add->instanceHandle;
}
}

writer.fragmented_change = work_change;
Expand Down

0 comments on commit 5059129

Please sign in to comment.