Skip to content

Commit

Permalink
Datasharing delivery refactor (#1900)
Browse files Browse the repository at this point in the history
This is a port of #1817 from <main> to <2.2.x>

* Refs 10731. DataSharingListener provides access to the pool of a matched writer

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. The writer prioritizes intraprocess over datasharing deliveries

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. The reader prioritizes intraprocess over datasharing deliveries

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Payloads not in the history of writer are considered reusable

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Move override check on the reader to ReadTakeCommand

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Remove override check on best effort

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Check deserialization errors on reader

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Avoid lock on writer when getting payload

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Test deserialization errors

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Adapt DataSharing tests to new implementation

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Remove reusability notifications to writer

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Improve check of data validity on reader

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Modify test to new behavior

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. fixup remove override check on RTPS

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. fix mocks

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. uncrustify

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Suggested changes

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Keep datasharing compatibility on the writer info

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. No error when requested loan size is zero

This corrects the regression on DataReaderTests.resource_limits

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Apply suggestions

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Atomic sequence number on datasharing node

- Make the sequence number atomic, as it signals the validity or
invalidation of the payload
- Clear the sequence number first when invalidating, set it last when
publishing
- Reset the pointer fields on the CacheChange when the pool returns no
valid data to the listener. Since the listener provides a
stack-allocated CacheChange for the pool to fill, if the return is
garbage, the destructor of the CacheChange will do unexpected things

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Protect the notification with the mutex

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Catch deserialization exceptions

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Make sure linters do not complain of void returns

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Uncrustify

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10731. Remove unused argument

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>
  • Loading branch information
IkerLuengo authored Apr 29, 2021
1 parent 079a48d commit 5f93fec
Show file tree
Hide file tree
Showing 32 changed files with 573 additions and 669 deletions.
10 changes: 2 additions & 8 deletions include/fastdds/dds/topic/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,7 @@ class TypeSupport : public std::shared_ptr<fastdds::dds::TopicDataType>
*/
RTPS_DllAPI virtual bool serialize(
void* data,
fastrtps::rtps::SerializedPayload_t* payload)
{
return get()->serialize(data, payload);
}
fastrtps::rtps::SerializedPayload_t* payload);

/**
* @brief Deserializes the data
Expand All @@ -151,10 +148,7 @@ class TypeSupport : public std::shared_ptr<fastdds::dds::TopicDataType>
*/
RTPS_DllAPI virtual bool deserialize(
fastrtps::rtps::SerializedPayload_t* payload,
void* data)
{
return get()->deserialize(payload, data);
}
void* data);

/**
* @brief Getter for the SerializedSizeProvider
Expand Down
4 changes: 1 addition & 3 deletions include/fastdds/rtps/reader/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class StatefulReader : public RTPSReader
*/
inline size_t getMatchedWritersSize() const
{
return matched_writers_.size() + matched_datasharing_writers_.size();
return matched_writers_.size();
}

/*!
Expand Down Expand Up @@ -356,8 +356,6 @@ class StatefulReader : public RTPSReader
bool disable_positive_acks_;
//! False when being destroyed
bool is_alive_;
//! Vector containing pointers to the active DataSharing WriterProxies.
ResourceLimitedVector<WriterProxy*> matched_datasharing_writers_;
};

} /* namespace rtps */
Expand Down
1 change: 1 addition & 0 deletions include/fastdds/rtps/reader/StatelessReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ class StatelessReader : public RTPSReader
GUID_t persistence_guid;
bool has_manual_topic_liveliness = false;
CacheChange_t* fragmented_change = nullptr;
bool is_datasharing = false;
};

bool acceptMsgFrom(
Expand Down
7 changes: 0 additions & 7 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,6 @@ class RTPSWriter : public Endpoint, public RTPSMessageSenderInterface
*/
bool is_datasharing_compatible() const;

/**
* @param source_timestamp the timestamp of the payload we want to recycle
* @return whether a payload with the given source timestamp can be reused for a new change
*/
virtual bool is_datasharing_payload_reusable(
const Time_t& source_timestamp) const = 0;

protected:

//!Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?.
Expand Down
7 changes: 0 additions & 7 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,6 @@ class StatefulWriter : public RTPSWriter
*/
const fastdds::rtps::IReaderDataFilter* reader_data_filter() const;

/**
* @param source_timestamp the timestamp of the payload we want to recycle
* @return whether a payload with the given source timestamp can be reused for a new change
*/
bool is_datasharing_payload_reusable(
const Time_t& source_timestamp) const override;

private:

bool is_acked_by_all(
Expand Down
7 changes: 0 additions & 7 deletions include/fastdds/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,6 @@ class StatelessWriter : public RTPSWriter
+ matched_datasharing_readers_.size();
}

/**
* @param source_timestamp the timestamp of the payload we want to recycle
* @return whether a payload with the given source timestamp can be reused for a new change
*/
bool is_datasharing_payload_reusable(
const Time_t& source_timestamp) const override;

private:

void init(
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
bool was_loaned = check_and_remove_loan(data, payload);
if (!was_loaned)
{
if (!get_free_payload_from_pool(type_->getSerializedSizeProvider(data), payload, max_blocking_time))
if (!get_free_payload_from_pool(type_->getSerializedSizeProvider(data), payload))
{
return ReturnCode_t::RETCODE_OUT_OF_RESOURCES;
}
Expand Down
40 changes: 0 additions & 40 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,46 +450,6 @@ class DataWriterImpl
const fastrtps::rtps::WriterAttributes& writer_attributes,
bool& is_datasharing_compatible) const;

template<typename SizeFunctor>
bool get_free_payload_from_pool(
const SizeFunctor& size_getter,
PayloadInfo_t& payload,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
CacheChange_t change;
if (!payload_pool_)
{
return false;
}

uint32_t size = fixed_payload_size_ ? fixed_payload_size_ : size_getter();
if (is_data_sharing_compatible_)
{
auto pool = std::dynamic_pointer_cast<eprosima::fastrtps::rtps::DataSharingPayloadPool>(payload_pool_);
assert (pool != nullptr);

bool payload_reserved = pool->wait_until(max_blocking_time,
[&]()
{
return pool->get_payload(size, change);
});
if (!payload_reserved)
{
return false;
}
}
else
{
if (!payload_pool_->get_payload(size, change))
{
return false;
}
}

payload.move_from_change(change);
return true;
}

template<typename SizeFunctor>
bool get_free_payload_from_pool(
const SizeFunctor& size_getter,
Expand Down
85 changes: 78 additions & 7 deletions src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/reader/RTPSReader.h>

#include <rtps/reader/WriterProxy.h>
#include <rtps/DataSharing/DataSharingPayloadPool.hpp>


namespace eprosima {
namespace fastdds {
namespace dds {
Expand All @@ -52,6 +56,7 @@ struct ReadTakeCommand
using RTPSReader = eprosima::fastrtps::rtps::RTPSReader;
using WriterProxy = eprosima::fastrtps::rtps::WriterProxy;
using SampleInfoSeq = LoanableTypedCollection<SampleInfo>;
using DataSharingPayloadPool = eprosima::fastrtps::rtps::DataSharingPayloadPool;

ReadTakeCommand(
DataReaderImpl& reader,
Expand Down Expand Up @@ -113,7 +118,18 @@ struct ReadTakeCommand
{
WriterProxy* wp = nullptr;
bool is_future_change = false;
if (!reader_->begin_sample_access_nts(change, wp, is_future_change))
bool remove_change = false;
if (reader_->begin_sample_access_nts(change, wp, is_future_change))
{
//Check if the payload is dirty
remove_change = !check_datasharing_validity(change, data_values_.has_ownership(), wp);
}
else
{
remove_change = true;
}

if (remove_change)
{
// Remove from history
history_.remove_change_sub(change, it);
Expand All @@ -130,9 +146,27 @@ struct ReadTakeCommand
}

// Add sample and info to collections
bool added = add_sample(*it);
ReturnCode_t previous_return_value = return_value_;
bool added = add_sample(change, remove_change);
reader_->end_sample_access_nts(change, wp, added);
if (added && take_samples)

// Check if the payload is dirty
if (added && !check_datasharing_validity(change, data_values_.has_ownership(), wp))
{
// Decrement length of collections
--current_slot_;
++remaining_samples_;
data_values_.length(current_slot_);
sample_infos_.length(current_slot_);

return_value_ = previous_return_value;
finished_ = false;

remove_change = true;
added = false;
}

if (remove_change || (added && take_samples))
{
// Remove from history
history_.remove_change_sub(change, it);
Expand Down Expand Up @@ -237,11 +271,13 @@ struct ReadTakeCommand
}

bool add_sample(
CacheChange_t* change)
CacheChange_t* change,
bool& deserialization_error)
{
// Mark that some data is available
return_value_ = ReturnCode_t::RETCODE_OK;
bool ret_val = false;
deserialization_error = false;

if (remaining_samples_ > 0)
{
Expand All @@ -254,7 +290,14 @@ struct ReadTakeCommand
generate_info(change);
if (sample_infos_[current_slot_].valid_data)
{
deserialize_sample(change);
if (!deserialize_sample(change))
{
// Decrement length of collections
data_values_.length(current_slot_);
sample_infos_.length(current_slot_);
deserialization_error = true;
return false;
}
}

++current_slot_;
Expand All @@ -267,21 +310,22 @@ struct ReadTakeCommand
return ret_val;
}

void deserialize_sample(
bool deserialize_sample(
CacheChange_t* change)
{
auto payload = &(change->serializedPayload);
if (data_values_.has_ownership())
{
// perform deserialization
type_->deserialize(payload, data_values_.buffer()[current_slot_]);
return type_->deserialize(payload, data_values_.buffer()[current_slot_]);
}
else
{
// loan
void* sample;
sample_pool_->get_loan(change, sample);
const_cast<void**>(data_values_.buffer())[current_slot_] = sample;
return true;
}
}

Expand Down Expand Up @@ -329,6 +373,33 @@ struct ReadTakeCommand
}
}

bool check_datasharing_validity(
CacheChange_t* change,
bool has_ownership,
WriterProxy* wp)
{
bool is_valid = true;
if (has_ownership) //< On loans the user must check the validity anyways
{
DataSharingPayloadPool* pool = dynamic_cast<DataSharingPayloadPool*>(change->payload_owner());
if (pool)
{
//Check if the payload is dirty
is_valid = pool->is_sample_valid(*change);
}
}

if (!is_valid)
{
logWarning(RTPS_READER,
"Change " << change->sequenceNumber << " from " << wp->guid() <<
" is overidden");
return false;
}

return true;
}

};

} /* namespace detail */
Expand Down
37 changes: 37 additions & 0 deletions src/cpp/fastdds/topic/TypeSupport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>

#include <fastcdr/exceptions/Exception.h>


namespace eprosima {
namespace fastdds {
namespace dds {
Expand All @@ -39,6 +42,40 @@ ReturnCode_t TypeSupport::register_type(
return participant->register_type(*this, get_type_name());
}

bool TypeSupport::serialize(
void* data,
fastrtps::rtps::SerializedPayload_t* payload)
{
bool result = false;
try
{
result = get()->serialize(data, payload);
}
catch (eprosima::fastcdr::exception::Exception&)
{
result = false;
}

return result;
}

bool TypeSupport::deserialize(
fastrtps::rtps::SerializedPayload_t* payload,
void* data)
{
bool result = false;
try
{
result = get()->deserialize(payload, data);
}
catch (eprosima::fastcdr::exception::Exception&)
{
result = false;
}

return result;
}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
Loading

0 comments on commit 5f93fec

Please sign in to comment.