Skip to content

Commit

Permalink
Propagate errors on take_next_data (#1022)
Browse files Browse the repository at this point in the history
This is a port of #962 from 1.9.x

SubscriberHistory::deserialize_change must return the result of the deserialization
and SubscriberHistory::takeNextData should propagate errors on deserialization and removal from history
  • Loading branch information
IkerLuengo authored Feb 21, 2020
1 parent b4f8d12 commit 3d18a54
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
2 changes: 1 addition & 1 deletion include/fastrtps/subscriber/SubscriberHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class SubscriberHistory: public rtps::ReaderHistory
rtps::CacheChange_t* a_change,
std::vector<rtps::CacheChange_t*>& instance_changes);

void deserialize_change(
bool deserialize_change(
rtps::CacheChange_t* change,
uint32_t ownership_strength,
void* data,
Expand Down
38 changes: 21 additions & 17 deletions src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ bool SubscriberHistory::received_change(
{
if (mp_reader == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY, "You need to create a Reader with this History before using it");
logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
return false;
}

Expand Down Expand Up @@ -260,7 +260,7 @@ bool SubscriberHistory::find_key_for_change(
{
if (!a_change->instanceHandle.isDefined() && type_ != nullptr)
{
logInfo(RTPS_HISTORY, "Getting Key of change with no Key transmitted")
logInfo(SUBSCRIBER, "Getting Key of change with no Key transmitted")
type_->deserialize(&a_change->serializedPayload, get_key_object_);
bool is_key_protected = false;
#if HAVE_SECURITY
Expand All @@ -273,23 +273,27 @@ bool SubscriberHistory::find_key_for_change(
}
else if (!a_change->instanceHandle.isDefined())
{
logWarning(RTPS_HISTORY, "NO KEY in topic: " << topic_att_.topicName
logWarning(SUBSCRIBER, "NO KEY in topic: " << topic_att_.topicName
<< " and no method to obtain it";);
return false;
}

return find_key(a_change, &map_it);
}

void SubscriberHistory::deserialize_change(
bool SubscriberHistory::deserialize_change(
CacheChange_t* change,
uint32_t ownership_strength,
void* data,
SampleInfo_t* info)
{
if (change->kind == ALIVE)
{
type_->deserialize(&change->serializedPayload, data);
if (!type_->deserialize(&change->serializedPayload, data))
{
logError(SUBSCRIBER, "Deserialization of data failed");
return false;
}
}

if (info != nullptr)
Expand All @@ -312,6 +316,8 @@ void SubscriberHistory::deserialize_change(
info->iHandle = change->instanceHandle;
info->related_sample_identity = change->write_params.sample_identity();
}

return true;
}

bool SubscriberHistory::readNextData(
Expand All @@ -321,7 +327,7 @@ bool SubscriberHistory::readNextData(
{
if (mp_reader == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY, "You need to create a Reader with this History before using it");
logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
return false;
}

Expand All @@ -336,8 +342,7 @@ bool SubscriberHistory::readNextData(
logInfo(SUBSCRIBER, mp_reader->getGuid().entityId << ": reading " << change->sequenceNumber);
uint32_t ownership = wp && qos_.m_ownership.kind == EXCLUSIVE_OWNERSHIP_QOS ?
wp->ownership_strength() : 0;
deserialize_change(change, ownership, data, info);
return true;
return deserialize_change(change, ownership, data, info);
}
}
return false;
Expand All @@ -351,7 +356,7 @@ bool SubscriberHistory::takeNextData(
{
if (mp_reader == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY, "You need to create a Reader with this History before using it");
logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
return false;
}

Expand All @@ -367,9 +372,9 @@ bool SubscriberHistory::takeNextData(
" from writer: " << change->writerGUID);
uint32_t ownership = wp && qos_.m_ownership.kind == EXCLUSIVE_OWNERSHIP_QOS ?
wp->ownership_strength() : 0;
deserialize_change(change, ownership, data, info);
remove_change_sub(change);
return true;
bool deserialized = deserialize_change(change, ownership, data, info);
bool removed = remove_change_sub(change);
return (deserialized && removed);
}
}

Expand Down Expand Up @@ -409,13 +414,12 @@ bool SubscriberHistory::find_key(
return false;
}


bool SubscriberHistory::remove_change_sub(
CacheChange_t* change)
{
if (mp_reader == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY, "You need to create a Reader with this History before using it");
logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
return false;
}

Expand Down Expand Up @@ -449,7 +453,7 @@ bool SubscriberHistory::remove_change_sub(
}
}
}
logError(SUBSCRIBER, "Change not found, something is wrong");
logError(SUBSCRIBER, "Change not found on this key, something is wrong");
}
return false;
}
Expand All @@ -460,7 +464,7 @@ bool SubscriberHistory::set_next_deadline(
{
if (mp_reader == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY, "You need to create a Reader with this History before using it");
logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
return false;
}
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
Expand Down Expand Up @@ -490,7 +494,7 @@ bool SubscriberHistory::get_next_deadline(
{
if (mp_reader == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY, "You need to create a Reader with this History before using it");
logError(SUBSCRIBER, "You need to create a Reader with this History before using it");
return false;
}
std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
Expand Down

0 comments on commit 3d18a54

Please sign in to comment.