Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/intraprocess datasharing [10845] #1817

Merged
merged 26 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
83d758a
Refs 10731. DataSharingListener provides access to the pool of a matc…
IkerLuengo Mar 3, 2021
b69e006
Refs 10731. The writer prioritizes intraprocess over datasharing deli…
IkerLuengo Mar 3, 2021
2dbde27
Refs 10731. The reader prioritizes intraprocess over datasharing deli…
IkerLuengo Mar 3, 2021
516ab0d
Refs 10731. Payloads not in the history of writer are considered reus…
IkerLuengo Mar 8, 2021
01a36f7
Refs 10731. Move override check on the reader to ReadTakeCommand
IkerLuengo Mar 8, 2021
c5810c7
Refs 10731. Remove override check on best effort
IkerLuengo Mar 8, 2021
1fe0655
Refs 10731. Check deserialization errors on reader
IkerLuengo Mar 8, 2021
40a5362
Refs 10731. Avoid lock on writer when getting payload
IkerLuengo Mar 8, 2021
270c73e
Refs 10731. Test deserialization errors
IkerLuengo Mar 8, 2021
e395020
Refs 10731. Adapt DataSharing tests to new implementation
IkerLuengo Mar 8, 2021
aad2c91
Refs 10731. Remove reusability notifications to writer
IkerLuengo Mar 8, 2021
de6478a
Refs 10731. Improve check of data validity on reader
IkerLuengo Mar 9, 2021
d7acec8
Refs 10731. Modify test to new behavior
IkerLuengo Mar 9, 2021
20ed930
Refs 10731. fixup remove override check on RTPS
IkerLuengo Mar 9, 2021
45a8377
Refs 10731. fix mocks
IkerLuengo Mar 10, 2021
87cab85
Refs 10731. uncrustify
IkerLuengo Mar 10, 2021
78de1ee
Refs 10731. Suggested changes
IkerLuengo Mar 16, 2021
aa78976
Refs 10731. Keep datasharing compatibility on the writer info
IkerLuengo Mar 16, 2021
e54dc09
Refs 10731. No error when requested loan size is zero
IkerLuengo Mar 16, 2021
8afe55c
Refs 10731. Apply suggestions
IkerLuengo Mar 18, 2021
f556e1f
Refs 10731. Atomic sequence number on datasharing node
IkerLuengo Mar 18, 2021
ad1ae2e
Refs 10731. Protect the notification with the mutex
IkerLuengo Mar 23, 2021
4fa23f0
Refs 10731. Catch deserialization exceptions
IkerLuengo Mar 29, 2021
6856d29
Refs 10731. Make sure linters do not complain of void returns
IkerLuengo Mar 29, 2021
21d98a9
Refs 10731. Uncrustify
IkerLuengo Mar 29, 2021
10eca52
Refs 10731. Remove unused argument
IkerLuengo Mar 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions include/fastdds/dds/topic/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,7 @@ class TypeSupport : public std::shared_ptr<fastdds::dds::TopicDataType>
*/
RTPS_DllAPI virtual bool serialize(
void* data,
fastrtps::rtps::SerializedPayload_t* payload)
{
return get()->serialize(data, payload);
}
fastrtps::rtps::SerializedPayload_t* payload);

/**
* @brief Deserializes the data
Expand All @@ -151,10 +148,7 @@ class TypeSupport : public std::shared_ptr<fastdds::dds::TopicDataType>
*/
RTPS_DllAPI virtual bool deserialize(
fastrtps::rtps::SerializedPayload_t* payload,
void* data)
{
return get()->deserialize(payload, data);
}
void* data);

/**
* @brief Getter for the SerializedSizeProvider
Expand Down
4 changes: 1 addition & 3 deletions include/fastdds/rtps/reader/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class StatefulReader : public RTPSReader
*/
inline size_t getMatchedWritersSize() const
{
return matched_writers_.size() + matched_datasharing_writers_.size();
return matched_writers_.size();
}

/*!
Expand Down Expand Up @@ -356,8 +356,6 @@ class StatefulReader : public RTPSReader
bool disable_positive_acks_;
//! False when being destroyed
bool is_alive_;
//! Vector containing pointers to the active DataSharing WriterProxies.
ResourceLimitedVector<WriterProxy*> matched_datasharing_writers_;
};

} /* namespace rtps */
Expand Down
1 change: 1 addition & 0 deletions include/fastdds/rtps/reader/StatelessReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ class StatelessReader : public RTPSReader
GUID_t persistence_guid;
bool has_manual_topic_liveliness = false;
CacheChange_t* fragmented_change = nullptr;
bool is_datasharing = false;
};

bool acceptMsgFrom(
Expand Down
7 changes: 0 additions & 7 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,6 @@ class RTPSWriter : public Endpoint, public RTPSMessageSenderInterface
*/
bool is_datasharing_compatible() const;

/**
* @param source_timestamp the timestamp of the payload we want to recycle
* @return whether a payload with the given source timestamp can be reused for a new change
*/
virtual bool is_datasharing_payload_reusable(
const Time_t& source_timestamp) const = 0;

protected:

//!Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?.
Expand Down
7 changes: 0 additions & 7 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,6 @@ class StatefulWriter : public RTPSWriter
*/
const fastdds::rtps::IReaderDataFilter* reader_data_filter() const;

/**
* @param source_timestamp the timestamp of the payload we want to recycle
* @return whether a payload with the given source timestamp can be reused for a new change
*/
bool is_datasharing_payload_reusable(
const Time_t& source_timestamp) const override;

private:

bool is_acked_by_all(
Expand Down
7 changes: 0 additions & 7 deletions include/fastdds/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,6 @@ class StatelessWriter : public RTPSWriter
+ matched_datasharing_readers_.size();
}

/**
* @param source_timestamp the timestamp of the payload we want to recycle
* @return whether a payload with the given source timestamp can be reused for a new change
*/
bool is_datasharing_payload_reusable(
const Time_t& source_timestamp) const override;

private:

void init(
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
bool was_loaned = check_and_remove_loan(data, payload);
if (!was_loaned)
{
if (!get_free_payload_from_pool(type_->getSerializedSizeProvider(data), payload, max_blocking_time))
if (!get_free_payload_from_pool(type_->getSerializedSizeProvider(data), payload))
{
return ReturnCode_t::RETCODE_OUT_OF_RESOURCES;
}
Expand Down
40 changes: 0 additions & 40 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,46 +461,6 @@ class DataWriterImpl
const fastrtps::rtps::WriterAttributes& writer_attributes,
bool& is_datasharing_compatible) const;

template<typename SizeFunctor>
bool get_free_payload_from_pool(
const SizeFunctor& size_getter,
PayloadInfo_t& payload,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
CacheChange_t change;
if (!payload_pool_)
{
return false;
}

uint32_t size = fixed_payload_size_ ? fixed_payload_size_ : size_getter();
if (is_data_sharing_compatible_)
{
auto pool = std::dynamic_pointer_cast<eprosima::fastrtps::rtps::DataSharingPayloadPool>(payload_pool_);
assert (pool != nullptr);

bool payload_reserved = pool->wait_until(max_blocking_time,
[&]()
{
return pool->get_payload(size, change);
});
if (!payload_reserved)
{
return false;
}
}
else
{
if (!payload_pool_->get_payload(size, change))
{
return false;
}
}

payload.move_from_change(change);
return true;
}

template<typename SizeFunctor>
bool get_free_payload_from_pool(
const SizeFunctor& size_getter,
Expand Down
85 changes: 78 additions & 7 deletions src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/reader/RTPSReader.h>

#include <rtps/reader/WriterProxy.h>
#include <rtps/DataSharing/DataSharingPayloadPool.hpp>


namespace eprosima {
namespace fastdds {
namespace dds {
Expand All @@ -52,6 +56,7 @@ struct ReadTakeCommand
using RTPSReader = eprosima::fastrtps::rtps::RTPSReader;
using WriterProxy = eprosima::fastrtps::rtps::WriterProxy;
using SampleInfoSeq = LoanableTypedCollection<SampleInfo>;
using DataSharingPayloadPool = eprosima::fastrtps::rtps::DataSharingPayloadPool;

ReadTakeCommand(
DataReaderImpl& reader,
Expand Down Expand Up @@ -113,7 +118,18 @@ struct ReadTakeCommand
{
WriterProxy* wp = nullptr;
bool is_future_change = false;
if (!reader_->begin_sample_access_nts(change, wp, is_future_change))
bool remove_change = false;
if (reader_->begin_sample_access_nts(change, wp, is_future_change))
{
//Check if the payload is dirty
remove_change = !check_datasharing_validity(change, data_values_.has_ownership(), wp);
}
else
{
remove_change = true;
}

if (remove_change)
{
// Remove from history
history_.remove_change_sub(change, it);
Expand All @@ -130,9 +146,27 @@ struct ReadTakeCommand
}

// Add sample and info to collections
bool added = add_sample(*it);
ReturnCode_t previous_return_value = return_value_;
bool added = add_sample(change, remove_change);
reader_->end_sample_access_nts(change, wp, added);
if (added && take_samples)

// Check if the payload is dirty
if (added && !check_datasharing_validity(change, data_values_.has_ownership(), wp))
{
// Decrement length of collections
--current_slot_;
++remaining_samples_;
data_values_.length(current_slot_);
sample_infos_.length(current_slot_);

return_value_ = previous_return_value;
finished_ = false;

remove_change = true;
added = false;
}

if (remove_change || (added && take_samples))
{
// Remove from history
history_.remove_change_sub(change, it);
Expand Down Expand Up @@ -237,11 +271,13 @@ struct ReadTakeCommand
}

bool add_sample(
CacheChange_t* change)
CacheChange_t* change,
bool& deserialization_error)
{
// Mark that some data is available
return_value_ = ReturnCode_t::RETCODE_OK;
bool ret_val = false;
deserialization_error = false;

if (remaining_samples_ > 0)
{
Expand All @@ -254,7 +290,14 @@ struct ReadTakeCommand
generate_info(change);
if (sample_infos_[current_slot_].valid_data)
{
deserialize_sample(change);
if (!deserialize_sample(change))
{
// Decrement length of collections
data_values_.length(current_slot_);
sample_infos_.length(current_slot_);
deserialization_error = true;
return false;
}
}

++current_slot_;
Expand All @@ -267,21 +310,22 @@ struct ReadTakeCommand
return ret_val;
}

void deserialize_sample(
bool deserialize_sample(
CacheChange_t* change)
{
auto payload = &(change->serializedPayload);
if (data_values_.has_ownership())
{
// perform deserialization
type_->deserialize(payload, data_values_.buffer()[current_slot_]);
return type_->deserialize(payload, data_values_.buffer()[current_slot_]);
}
else
{
// loan
void* sample;
sample_pool_->get_loan(change, sample);
const_cast<void**>(data_values_.buffer())[current_slot_] = sample;
return true;
}
}

Expand Down Expand Up @@ -330,6 +374,33 @@ struct ReadTakeCommand
}
}

bool check_datasharing_validity(
CacheChange_t* change,
bool has_ownership,
WriterProxy* wp)
{
bool is_valid = true;
if (has_ownership) //< On loans the user must check the validity anyways
{
DataSharingPayloadPool* pool = dynamic_cast<DataSharingPayloadPool*>(change->payload_owner());
if (pool)
{
//Check if the payload is dirty
is_valid = pool->is_sample_valid(*change);
}
}

if (!is_valid)
{
logWarning(RTPS_READER,
"Change " << change->sequenceNumber << " from " << wp->guid() <<
" is overidden");
return false;
}

return true;
}

};

} /* namespace detail */
Expand Down
37 changes: 37 additions & 0 deletions src/cpp/fastdds/topic/TypeSupport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>

#include <fastcdr/exceptions/Exception.h>


namespace eprosima {
namespace fastdds {
namespace dds {
Expand All @@ -39,6 +42,40 @@ ReturnCode_t TypeSupport::register_type(
return participant->register_type(*this, get_type_name());
}

bool TypeSupport::serialize(
void* data,
fastrtps::rtps::SerializedPayload_t* payload)
{
bool result = false;
try
{
result = get()->serialize(data, payload);
}
catch (eprosima::fastcdr::exception::Exception& e)
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
{
result = false;
}

return result;
}

bool TypeSupport::deserialize(
fastrtps::rtps::SerializedPayload_t* payload,
void* data)
{
bool result = false;
try
{
result = get()->deserialize(payload, data);
}
catch (eprosima::fastcdr::exception::Exception& e)
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
{
result = false;
}

return result;
}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
Loading