From 5f93fecdd166c3c61119a2424b1713b718ed7e32 Mon Sep 17 00:00:00 2001 From: IkerLuengo <57146230+IkerLuengo@users.noreply.github.com> Date: Thu, 29 Apr 2021 11:09:35 +0200 Subject: [PATCH] Datasharing delivery refactor (#1900) This is a port of #1817 from
to <2.2.x> * Refs 10731. DataSharingListener provides access to the pool of a matched writer Signed-off-by: Iker Luengo * Refs 10731. The writer prioritizes intraprocess over datasharing deliveries Signed-off-by: Iker Luengo * Refs 10731. The reader prioritizes intraprocess over datasharing deliveries Signed-off-by: Iker Luengo * Refs 10731. Payloads not in the history of writer are considered reusable Signed-off-by: Iker Luengo * Refs 10731. Move override check on the reader to ReadTakeCommand Signed-off-by: Iker Luengo * Refs 10731. Remove override check on best effort Signed-off-by: Iker Luengo * Refs 10731. Check deserialization errors on reader Signed-off-by: Iker Luengo * Refs 10731. Avoid lock on writer when getting payload Signed-off-by: Iker Luengo * Refs 10731. Test deserialization errors Signed-off-by: Iker Luengo * Refs 10731. Adapt DataSharing tests to new implementation Signed-off-by: Iker Luengo * Refs 10731. Remove reusability notifications to writer Signed-off-by: Iker Luengo * Refs 10731. Improve check of data validity on reader Signed-off-by: Iker Luengo * Refs 10731. Modify test to new behavior Signed-off-by: Iker Luengo * Refs 10731. fixup remove override check on RTPS Signed-off-by: Iker Luengo * Refs 10731. fix mocks Signed-off-by: Iker Luengo * Refs 10731. uncrustify Signed-off-by: Iker Luengo * Refs 10731. Suggested changes Signed-off-by: Iker Luengo * Refs 10731. Keep datasharing compatibility on the writer info Signed-off-by: Iker Luengo * Refs 10731. No error when requested loan size is zero This corrects the regression on DataReaderTests.resource_limits Signed-off-by: Iker Luengo * Refs 10731. Apply suggestions Signed-off-by: Iker Luengo * Refs 10731. Atomic sequence number on datasharing node - Make the sequence number atomic, as it signals the validity or invalidation of the payload - Clear the sequence number first when invalidating, set it last when publishing - Reset the pointer fields on the CacheChange when the pool returns no valid data to the listener. Since the listener provides a stack-allocated CacheChange for the pool to fill, if the return is garbage, the destructor of the CacheChange will do unexpected things Signed-off-by: Iker Luengo * Refs 10731. Protect the notification with the mutex Signed-off-by: Iker Luengo * Refs 10731. Catch deserialization exceptions Signed-off-by: Iker Luengo * Refs 10731. Make sure linters do not complain of void returns Signed-off-by: Iker Luengo * Refs 10731. Uncrustify Signed-off-by: Iker Luengo * Refs 10731. Remove unused argument Signed-off-by: Iker Luengo --- include/fastdds/dds/topic/TypeSupport.hpp | 10 +- include/fastdds/rtps/reader/StatefulReader.h | 4 +- include/fastdds/rtps/reader/StatelessReader.h | 1 + include/fastdds/rtps/writer/RTPSWriter.h | 7 - include/fastdds/rtps/writer/StatefulWriter.h | 7 - include/fastdds/rtps/writer/StatelessWriter.h | 7 - src/cpp/fastdds/publisher/DataWriterImpl.cpp | 2 +- src/cpp/fastdds/publisher/DataWriterImpl.hpp | 40 --- .../DataReaderImpl/ReadTakeCommand.hpp | 85 +++++- src/cpp/fastdds/topic/TypeSupport.cpp | 37 +++ .../rtps/DataSharing/DataSharingListener.cpp | 34 +-- .../rtps/DataSharing/DataSharingListener.hpp | 8 +- .../DataSharing/DataSharingNotification.cpp | 3 - .../DataSharing/DataSharingNotification.hpp | 5 +- .../rtps/DataSharing/DataSharingNotifier.hpp | 8 - .../DataSharing/DataSharingPayloadPool.cpp | 15 +- .../DataSharing/DataSharingPayloadPool.hpp | 43 +-- .../rtps/DataSharing/IDataSharingListener.hpp | 19 +- .../rtps/DataSharing/IDataSharingNotifier.hpp | 4 - src/cpp/rtps/DataSharing/ReaderPool.hpp | 69 +++-- src/cpp/rtps/DataSharing/WriterPool.hpp | 25 +- src/cpp/rtps/reader/StatefulReader.cpp | 258 ++++-------------- src/cpp/rtps/reader/StatelessReader.cpp | 236 +++++----------- src/cpp/rtps/reader/WriterProxy.cpp | 11 +- src/cpp/rtps/reader/WriterProxy.h | 24 +- src/cpp/rtps/writer/StatefulWriter.cpp | 25 +- src/cpp/rtps/writer/StatelessWriter.cpp | 29 +- .../common/BlackboxTestsPubSubHistory.cpp | 7 + .../common/DDSBlackboxTestsDataSharing.cpp | 32 ++- .../DataSharing/DataSharingPayloadPool.hpp | 8 +- .../rtps/DataSharing/ReaderPool.hpp | 40 +++ .../dds/subscriber/DataReaderTests.cpp | 139 ++++++++++ 32 files changed, 573 insertions(+), 669 deletions(-) create mode 100644 test/mock/rtps/DataSharingPayloadPool/rtps/DataSharing/ReaderPool.hpp diff --git a/include/fastdds/dds/topic/TypeSupport.hpp b/include/fastdds/dds/topic/TypeSupport.hpp index ebcbdb3615a..a17c503b7dd 100644 --- a/include/fastdds/dds/topic/TypeSupport.hpp +++ b/include/fastdds/dds/topic/TypeSupport.hpp @@ -138,10 +138,7 @@ class TypeSupport : public std::shared_ptr */ RTPS_DllAPI virtual bool serialize( void* data, - fastrtps::rtps::SerializedPayload_t* payload) - { - return get()->serialize(data, payload); - } + fastrtps::rtps::SerializedPayload_t* payload); /** * @brief Deserializes the data @@ -151,10 +148,7 @@ class TypeSupport : public std::shared_ptr */ RTPS_DllAPI virtual bool deserialize( fastrtps::rtps::SerializedPayload_t* payload, - void* data) - { - return get()->deserialize(payload, data); - } + void* data); /** * @brief Getter for the SerializedSizeProvider diff --git a/include/fastdds/rtps/reader/StatefulReader.h b/include/fastdds/rtps/reader/StatefulReader.h index 74eeabe6440..a803a5675a8 100644 --- a/include/fastdds/rtps/reader/StatefulReader.h +++ b/include/fastdds/rtps/reader/StatefulReader.h @@ -225,7 +225,7 @@ class StatefulReader : public RTPSReader */ inline size_t getMatchedWritersSize() const { - return matched_writers_.size() + matched_datasharing_writers_.size(); + return matched_writers_.size(); } /*! @@ -356,8 +356,6 @@ class StatefulReader : public RTPSReader bool disable_positive_acks_; //! False when being destroyed bool is_alive_; - //! Vector containing pointers to the active DataSharing WriterProxies. - ResourceLimitedVector matched_datasharing_writers_; }; } /* namespace rtps */ diff --git a/include/fastdds/rtps/reader/StatelessReader.h b/include/fastdds/rtps/reader/StatelessReader.h index df263f4bacd..f6079d4c03a 100644 --- a/include/fastdds/rtps/reader/StatelessReader.h +++ b/include/fastdds/rtps/reader/StatelessReader.h @@ -258,6 +258,7 @@ class StatelessReader : public RTPSReader GUID_t persistence_guid; bool has_manual_topic_liveliness = false; CacheChange_t* fragmented_change = nullptr; + bool is_datasharing = false; }; bool acceptMsgFrom( diff --git a/include/fastdds/rtps/writer/RTPSWriter.h b/include/fastdds/rtps/writer/RTPSWriter.h index 57200345d35..1889eaacec1 100644 --- a/include/fastdds/rtps/writer/RTPSWriter.h +++ b/include/fastdds/rtps/writer/RTPSWriter.h @@ -419,13 +419,6 @@ class RTPSWriter : public Endpoint, public RTPSMessageSenderInterface */ bool is_datasharing_compatible() const; - /** - * @param source_timestamp the timestamp of the payload we want to recycle - * @return whether a payload with the given source timestamp can be reused for a new change - */ - virtual bool is_datasharing_payload_reusable( - const Time_t& source_timestamp) const = 0; - protected: //!Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?. diff --git a/include/fastdds/rtps/writer/StatefulWriter.h b/include/fastdds/rtps/writer/StatefulWriter.h index 973f32d47d0..55b0f64513f 100644 --- a/include/fastdds/rtps/writer/StatefulWriter.h +++ b/include/fastdds/rtps/writer/StatefulWriter.h @@ -381,13 +381,6 @@ class StatefulWriter : public RTPSWriter */ const fastdds::rtps::IReaderDataFilter* reader_data_filter() const; - /** - * @param source_timestamp the timestamp of the payload we want to recycle - * @return whether a payload with the given source timestamp can be reused for a new change - */ - bool is_datasharing_payload_reusable( - const Time_t& source_timestamp) const override; - private: bool is_acked_by_all( diff --git a/include/fastdds/rtps/writer/StatelessWriter.h b/include/fastdds/rtps/writer/StatelessWriter.h index 6906b6e105a..de38f1e2e1b 100644 --- a/include/fastdds/rtps/writer/StatelessWriter.h +++ b/include/fastdds/rtps/writer/StatelessWriter.h @@ -180,13 +180,6 @@ class StatelessWriter : public RTPSWriter + matched_datasharing_readers_.size(); } - /** - * @param source_timestamp the timestamp of the payload we want to recycle - * @return whether a payload with the given source timestamp can be reused for a new change - */ - bool is_datasharing_payload_reusable( - const Time_t& source_timestamp) const override; - private: void init( diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index cfa08db3674..8842d6a1869 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -679,7 +679,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change( bool was_loaned = check_and_remove_loan(data, payload); if (!was_loaned) { - if (!get_free_payload_from_pool(type_->getSerializedSizeProvider(data), payload, max_blocking_time)) + if (!get_free_payload_from_pool(type_->getSerializedSizeProvider(data), payload)) { return ReturnCode_t::RETCODE_OUT_OF_RESOURCES; } diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.hpp b/src/cpp/fastdds/publisher/DataWriterImpl.hpp index 17c3b330dae..b0f4191be76 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.hpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.hpp @@ -450,46 +450,6 @@ class DataWriterImpl const fastrtps::rtps::WriterAttributes& writer_attributes, bool& is_datasharing_compatible) const; - template - bool get_free_payload_from_pool( - const SizeFunctor& size_getter, - PayloadInfo_t& payload, - const std::chrono::time_point& max_blocking_time) - { - CacheChange_t change; - if (!payload_pool_) - { - return false; - } - - uint32_t size = fixed_payload_size_ ? fixed_payload_size_ : size_getter(); - if (is_data_sharing_compatible_) - { - auto pool = std::dynamic_pointer_cast(payload_pool_); - assert (pool != nullptr); - - bool payload_reserved = pool->wait_until(max_blocking_time, - [&]() - { - return pool->get_payload(size, change); - }); - if (!payload_reserved) - { - return false; - } - } - else - { - if (!payload_pool_->get_payload(size, change)) - { - return false; - } - } - - payload.move_from_change(change); - return true; - } - template bool get_free_payload_from_pool( const SizeFunctor& size_getter, diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp index 9e1895da4dd..9fb294d5b02 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp @@ -39,6 +39,10 @@ #include #include +#include +#include + + namespace eprosima { namespace fastdds { namespace dds { @@ -52,6 +56,7 @@ struct ReadTakeCommand using RTPSReader = eprosima::fastrtps::rtps::RTPSReader; using WriterProxy = eprosima::fastrtps::rtps::WriterProxy; using SampleInfoSeq = LoanableTypedCollection; + using DataSharingPayloadPool = eprosima::fastrtps::rtps::DataSharingPayloadPool; ReadTakeCommand( DataReaderImpl& reader, @@ -113,7 +118,18 @@ struct ReadTakeCommand { WriterProxy* wp = nullptr; bool is_future_change = false; - if (!reader_->begin_sample_access_nts(change, wp, is_future_change)) + bool remove_change = false; + if (reader_->begin_sample_access_nts(change, wp, is_future_change)) + { + //Check if the payload is dirty + remove_change = !check_datasharing_validity(change, data_values_.has_ownership(), wp); + } + else + { + remove_change = true; + } + + if (remove_change) { // Remove from history history_.remove_change_sub(change, it); @@ -130,9 +146,27 @@ struct ReadTakeCommand } // Add sample and info to collections - bool added = add_sample(*it); + ReturnCode_t previous_return_value = return_value_; + bool added = add_sample(change, remove_change); reader_->end_sample_access_nts(change, wp, added); - if (added && take_samples) + + // Check if the payload is dirty + if (added && !check_datasharing_validity(change, data_values_.has_ownership(), wp)) + { + // Decrement length of collections + --current_slot_; + ++remaining_samples_; + data_values_.length(current_slot_); + sample_infos_.length(current_slot_); + + return_value_ = previous_return_value; + finished_ = false; + + remove_change = true; + added = false; + } + + if (remove_change || (added && take_samples)) { // Remove from history history_.remove_change_sub(change, it); @@ -237,11 +271,13 @@ struct ReadTakeCommand } bool add_sample( - CacheChange_t* change) + CacheChange_t* change, + bool& deserialization_error) { // Mark that some data is available return_value_ = ReturnCode_t::RETCODE_OK; bool ret_val = false; + deserialization_error = false; if (remaining_samples_ > 0) { @@ -254,7 +290,14 @@ struct ReadTakeCommand generate_info(change); if (sample_infos_[current_slot_].valid_data) { - deserialize_sample(change); + if (!deserialize_sample(change)) + { + // Decrement length of collections + data_values_.length(current_slot_); + sample_infos_.length(current_slot_); + deserialization_error = true; + return false; + } } ++current_slot_; @@ -267,14 +310,14 @@ struct ReadTakeCommand return ret_val; } - void deserialize_sample( + bool deserialize_sample( CacheChange_t* change) { auto payload = &(change->serializedPayload); if (data_values_.has_ownership()) { // perform deserialization - type_->deserialize(payload, data_values_.buffer()[current_slot_]); + return type_->deserialize(payload, data_values_.buffer()[current_slot_]); } else { @@ -282,6 +325,7 @@ struct ReadTakeCommand void* sample; sample_pool_->get_loan(change, sample); const_cast(data_values_.buffer())[current_slot_] = sample; + return true; } } @@ -329,6 +373,33 @@ struct ReadTakeCommand } } + bool check_datasharing_validity( + CacheChange_t* change, + bool has_ownership, + WriterProxy* wp) + { + bool is_valid = true; + if (has_ownership) //< On loans the user must check the validity anyways + { + DataSharingPayloadPool* pool = dynamic_cast(change->payload_owner()); + if (pool) + { + //Check if the payload is dirty + is_valid = pool->is_sample_valid(*change); + } + } + + if (!is_valid) + { + logWarning(RTPS_READER, + "Change " << change->sequenceNumber << " from " << wp->guid() << + " is overidden"); + return false; + } + + return true; + } + }; } /* namespace detail */ diff --git a/src/cpp/fastdds/topic/TypeSupport.cpp b/src/cpp/fastdds/topic/TypeSupport.cpp index 85483925a11..f2654860e0d 100644 --- a/src/cpp/fastdds/topic/TypeSupport.cpp +++ b/src/cpp/fastdds/topic/TypeSupport.cpp @@ -20,6 +20,9 @@ #include #include +#include + + namespace eprosima { namespace fastdds { namespace dds { @@ -39,6 +42,40 @@ ReturnCode_t TypeSupport::register_type( return participant->register_type(*this, get_type_name()); } +bool TypeSupport::serialize( + void* data, + fastrtps::rtps::SerializedPayload_t* payload) +{ + bool result = false; + try + { + result = get()->serialize(data, payload); + } + catch (eprosima::fastcdr::exception::Exception&) + { + result = false; + } + + return result; +} + +bool TypeSupport::deserialize( + fastrtps::rtps::SerializedPayload_t* payload, + void* data) +{ + bool result = false; + try + { + result = get()->deserialize(payload, data); + } + catch (eprosima::fastcdr::exception::Exception&) + { + result = false; + } + + return result; +} + } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index 7b5155fd9ba..6fe3f61c690 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -257,31 +257,21 @@ void DataSharingListener::notify( } } -void DataSharingListener::change_removed_with_timestamp( - int64_t timestamp) -{ - // This method should be called from the RTPSReader, - // then, the reader's lock is protecting the concurrency on the value updates. - if (timestamp > notification_->notification_->ack_timestamp.load()) - { - notification_->notification_->ack_timestamp.store(timestamp); - for (auto it = writer_pools_.begin(); it != writer_pools_.end(); ++it) - { - // Notify all writers in case any is waiting for a recyclable payload - it->pool->notify(); - } - } -} - -void DataSharingListener::change_added_with_timestamp( - int64_t timestamp) +std::shared_ptr DataSharingListener::get_pool_for_writer( + const GUID_t& writer_guid) { - // This method should be called from the RTPSReader, - // then, the reader's lock is protecting the concurrency on the value updates. - if (timestamp < notification_->notification_->ack_timestamp.load()) + std::lock_guard lock(mutex_); + auto it = std::find_if(writer_pools_.begin(), writer_pools_.end(), + [&writer_guid](const WriterInfo& info) + { + return info.pool->writer() == writer_guid; + } + ); + if (it != writer_pools_.end()) { - notification_->notification_->ack_timestamp.store(timestamp); + return it->pool; } + return std::shared_ptr(nullptr); } } // namespace rtps diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.hpp b/src/cpp/rtps/DataSharing/DataSharingListener.hpp index 2a29a09fc6b..e76a25d6fb7 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.hpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -77,11 +76,8 @@ class DataSharingListener : public IDataSharingListener void notify( bool same_thread) override; - void change_removed_with_timestamp( - int64_t timestamp) override; - - void change_added_with_timestamp( - int64_t timestamp) override; + std::shared_ptr get_pool_for_writer( + const GUID_t& writer_guid) override; protected: diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp index 1f636b012b4..7a0023f5883 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.cpp @@ -98,9 +98,6 @@ bool DataSharingNotification::create_and_init_notification( // Alloc and initialize the Node notification_ = segment_->get().construct("notification_node")(); notification_->new_data.store(false); - Time_t now; - Time_t::now(now); - notification_->ack_timestamp.store(now.to_ns()); } catch (std::exception& e) { diff --git a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp index e6d8fbffc0c..639ff9cbbcf 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotification.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotification.hpp @@ -53,7 +53,9 @@ class DataSharingNotification */ inline void notify() { + std::unique_lock lock(notification_->notification_mutex); notification_->new_data.store(true); + lock.unlock(); notification_->notification_cv.notify_all(); } @@ -101,9 +103,6 @@ class DataSharingNotification //! New data available std::atomic new_data; - - //! Timestamp of the reader's first sample NOT ack'd - std::atomic ack_timestamp; }; #pragma warning(pop) diff --git a/src/cpp/rtps/DataSharing/DataSharingNotifier.hpp b/src/cpp/rtps/DataSharing/DataSharingNotifier.hpp index 200bf0eb34e..c1b5b81e45e 100644 --- a/src/cpp/rtps/DataSharing/DataSharingNotifier.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingNotifier.hpp @@ -84,14 +84,6 @@ class DataSharingNotifier : public IDataSharingNotifier } } - /** - * @return the ACK'd timestamp - */ - int64_t ack_timestamp() const override - { - return shared_notification_->notification_->ack_timestamp.load(); - } - protected: std::shared_ptr shared_notification_; diff --git a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp index 9a8b3282bdc..483ba57b03c 100644 --- a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp @@ -81,10 +81,10 @@ bool DataSharingPayloadPool::check_sequence_number( return (PayloadNode::get_from_data(data)->sequence_number() == sn); } -DataSharingPayloadPool::sharable_mutex& DataSharingPayloadPool::shared_mutex( - octet* data) +bool DataSharingPayloadPool::is_sample_valid( + const CacheChange_t& change) const { - return PayloadNode::get_from_data(data)->mutex(); + return check_sequence_number(change.serializedPayload.data, change.sequenceNumber); } std::shared_ptr DataSharingPayloadPool::get_reader_pool( @@ -104,15 +104,6 @@ std::shared_ptr DataSharingPayloadPool::get_writer_pool( config.payload_initial_size); } -/** - * Notifies to the writer - */ -void DataSharingPayloadPool::notify() -{ - logInfo(RTPS_READER, "Notifying writer " << writer()); - descriptor_->notification_cv.notify_all(); -} - } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp index ded64c0aad9..be75e61d332 100644 --- a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp @@ -126,33 +126,12 @@ class DataSharingPayloadPool : public IPayloadPool uint32_t last_liveliness_sequence() const; - /** - * Notifies to the writer - */ - void notify(); - - template - bool wait_until( - const std::chrono::time_point& max_blocking_time, - const ConditionFunctor& condition) - { - std::unique_lock lock(descriptor_->notification_mutex); - bool success = false; - descriptor_->notification_cv.timed_wait(lock, max_blocking_time, - [&success, &condition]() - { - success = condition(); - return success; - }); - return success; - } - static bool check_sequence_number( const octet* data, const SequenceNumber_t& sn); - static sharable_mutex& shared_mutex( - octet*); + bool is_sample_valid( + const CacheChange_t& change) const; protected: @@ -189,7 +168,7 @@ class DataSharingPayloadPool : public IPayloadPool Time_t source_timestamp; // Sequence number of the payload inside the writer - SequenceNumber_t sequence_number; + std::atomic sequence_number; // GUID of the writer that created the payload GUID_t writer_GUID; @@ -217,7 +196,8 @@ class DataSharingPayloadPool : public IPayloadPool void reset() { - metadata_.sequence_number = c_SequenceNumber_Unknown; + // Reset the sequence number first, it signals the data is not valid anymore + metadata_.sequence_number.store(c_SequenceNumber_Unknown, std::memory_order_relaxed); metadata_.status = fastrtps::rtps::ChangeKind_t::ALIVE; metadata_.data_length = 0; metadata_.writer_GUID = c_Guid_Unknown; @@ -277,13 +257,14 @@ class DataSharingPayloadPool : public IPayloadPool SequenceNumber_t sequence_number() const { - return metadata_.sequence_number; + SequenceNumber_t value = metadata_.sequence_number.load(std::memory_order_relaxed); + return value; } void sequence_number( SequenceNumber_t sequence_number) { - metadata_.sequence_number = sequence_number; + metadata_.sequence_number.store(sequence_number, std::memory_order_relaxed); } Time_t source_timestamp() const @@ -330,11 +311,6 @@ class DataSharingPayloadPool : public IPayloadPool metadata_.related_sample_identity = identity; } - sharable_mutex& mutex() - { - return metadata_.mutex; - } - private: PayloadNodeMetaData metadata_; @@ -347,9 +323,6 @@ class DataSharingPayloadPool : public IPayloadPool uint64_t notified_begin; //< The index of the oldest history entry already notified (ready to read) uint64_t notified_end; //< The index of the history entry that will be notified next uint32_t liveliness_sequence; //< The ID of the last liveliness assertion sent by the writer - - Segment::condition_variable notification_cv; //< CV to wait for notifications from the reader - Segment::mutex notification_mutex; //< synchronization mutex }; #pragma warning(pop) diff --git a/src/cpp/rtps/DataSharing/IDataSharingListener.hpp b/src/cpp/rtps/DataSharing/IDataSharingListener.hpp index 5b96aa082b9..ab3d4694fa3 100644 --- a/src/cpp/rtps/DataSharing/IDataSharingListener.hpp +++ b/src/cpp/rtps/DataSharing/IDataSharingListener.hpp @@ -21,7 +21,7 @@ #include #include -#include +#include #include #include @@ -99,20 +99,13 @@ class IDataSharingListener bool same_thread) = 0; /** - * Updates the ACK'd timestamp of the notification due to a change read/removed from the history + * Returns the local datasharing pool for the specified remote writer * - * @param timestamp the source timestamp of the change + * @param writer_guid The GUID of the remote writer + * @return the local pool for the given writer or null if the writer is not being listened */ - virtual void change_removed_with_timestamp( - int64_t timestamp) = 0; - - /** - * Updates the ACK'd timestamp of the notification due to a new change added to the history - * - * @param timestamp the source timestamp of the added change - */ - virtual void change_added_with_timestamp( - int64_t timestamp) = 0; + virtual std::shared_ptr get_pool_for_writer( + const GUID_t& writer_guid) = 0; }; diff --git a/src/cpp/rtps/DataSharing/IDataSharingNotifier.hpp b/src/cpp/rtps/DataSharing/IDataSharingNotifier.hpp index f4f18b6c27b..34621ae70d6 100644 --- a/src/cpp/rtps/DataSharing/IDataSharingNotifier.hpp +++ b/src/cpp/rtps/DataSharing/IDataSharingNotifier.hpp @@ -57,10 +57,6 @@ class IDataSharingNotifier */ virtual void notify() = 0; - /** - * @return the ACK'd timestamp - */ - virtual int64_t ack_timestamp() const = 0; }; diff --git a/src/cpp/rtps/DataSharing/ReaderPool.hpp b/src/cpp/rtps/DataSharing/ReaderPool.hpp index 43d75eb9e45..8c1b9db7bdd 100644 --- a/src/cpp/rtps/DataSharing/ReaderPool.hpp +++ b/src/cpp/rtps/DataSharing/ReaderPool.hpp @@ -55,9 +55,6 @@ class ReaderPool : public DataSharingPayloadPool IPayloadPool*& data_owner, CacheChange_t& cache_change) override { - assert(data_owner == this || data_owner == nullptr); - - assert(data_owner == this); if (data_owner == this) { cache_change.serializedPayload.data = data.data; @@ -67,7 +64,13 @@ class ReaderPool : public DataSharingPayloadPool return true; } - return false; + // If owner is not this, then it must be an intraprocess datasharing writer + assert(nullptr != dynamic_cast(data_owner)); + PayloadNode* payload = PayloadNode::get_from_data(data.data); + + // No need to check validity, on intraprocess there is no override of payloads + read_from_shared_history(cache_change, payload); + return true; } bool release_payload( @@ -85,7 +88,7 @@ class ReaderPool : public DataSharingPayloadPool segment_id_ = writer_guid; segment_name_ = generate_segment_name(shared_dir, writer_guid); - //Open the segment + // Open the segment try { segment_ = std::unique_ptr( @@ -158,30 +161,7 @@ class ReaderPool : public DataSharingPayloadPool // history_[next_payload_] contains the offset to the payload PayloadNode* payload = static_cast( segment_->get_address_from_offset(history_[static_cast(next_payload_)])); - - // The SN is the first thing to be invalidated on the writer - cache_change.sequenceNumber = payload->sequence_number(); - if (cache_change.sequenceNumber == c_SequenceNumber_Unknown) - { - // Reset by the writer. Discard and continue - advance(next_payload_); - logWarning(RTPS_READER, "Dirty data detected on datasharing writer " << writer()); - continue; - } - - cache_change.serializedPayload.data = payload->data(); - cache_change.serializedPayload.max_size = payload->data_length(); - cache_change.serializedPayload.length = payload->data_length(); - - cache_change.kind = static_cast(payload->status()); - cache_change.writerGUID = payload->writer_GUID(); - cache_change.instanceHandle = payload->instance_handle(); - cache_change.sourceTimestamp = payload->source_timestamp(); - cache_change.write_params.sample_identity(payload->related_sample_identity()); - - cache_change.payload_owner(this); - - if (payload->sequence_number() != cache_change.sequenceNumber) + if (!read_from_shared_history(cache_change, payload)) { // Overriden while retrieving. Discard and continue advance(next_payload_); @@ -200,7 +180,38 @@ class ReaderPool : public DataSharingPayloadPool return; } + // Reset the data (may cause errors later on) cache_change.sequenceNumber = c_SequenceNumber_Unknown; + cache_change.serializedPayload.data = nullptr; + cache_change.payload_owner(nullptr); + } + + bool read_from_shared_history( + CacheChange_t& cache_change, + PayloadNode* payload) + { + // The sequence number can be unknown already, but we defer the check to the end + cache_change.sequenceNumber = payload->sequence_number(); + + cache_change.serializedPayload.data = payload->data(); + cache_change.serializedPayload.max_size = payload->data_length(); + cache_change.serializedPayload.length = payload->data_length(); + + cache_change.kind = static_cast(payload->status()); + cache_change.writerGUID = payload->writer_GUID(); + cache_change.instanceHandle = payload->instance_handle(); + cache_change.sourceTimestamp = payload->source_timestamp(); + cache_change.write_params.sample_identity(payload->related_sample_identity()); + + SequenceNumber_t check = payload->sequence_number(); + if (check == c_SequenceNumber_Unknown || check != cache_change.sequenceNumber) + { + // data override while processing + return false; + } + + cache_change.payload_owner(this); + return true; } const SequenceNumber_t& get_last_read_sequence_number() diff --git a/src/cpp/rtps/DataSharing/WriterPool.hpp b/src/cpp/rtps/DataSharing/WriterPool.hpp index beda4b25464..1112e8010ee 100644 --- a/src/cpp/rtps/DataSharing/WriterPool.hpp +++ b/src/cpp/rtps/DataSharing/WriterPool.hpp @@ -65,27 +65,10 @@ class WriterPool : public DataSharingPayloadPool return false; } - // Look for a free payload that is recyclable - PayloadNode* payload = nullptr; - for (auto it = free_payloads_.begin(); it != free_payloads_.end(); ++it) - { - if (writer_->is_datasharing_payload_reusable((*it)->source_timestamp())) - { - payload = *it; - free_payloads_.erase(it); - break; - } - } - if (payload == nullptr) - { - return false; - } - - payload->mutex().lock(); + PayloadNode* payload = free_payloads_.front(); + free_payloads_.pop_front(); // Reset all the metadata to signal the reader that the payload is dirty payload->reset(); - // Now we can unlock - payload->mutex().unlock(); cache_change.serializedPayload.data = payload->data(); cache_change.serializedPayload.max_size = max_data_size_; @@ -264,7 +247,6 @@ class WriterPool : public DataSharingPayloadPool // Fill the payload metadata with the change info PayloadNode* node = PayloadNode::get_from_data(cache_change->serializedPayload.data); - node->sequence_number(cache_change->sequenceNumber); node->status(ALIVE); node->data_length(cache_change->serializedPayload.length); node->source_timestamp(cache_change->sourceTimestamp); @@ -275,6 +257,9 @@ class WriterPool : public DataSharingPayloadPool node->related_sample_identity(cache_change->write_params.related_sample_identity()); } + // Set the sequence number last, it signals the data is ready + node->sequence_number(cache_change->sequenceNumber); + // Add it to the history history_[static_cast(descriptor_->notified_end)] = segment_->get_offset_from_address(node); logInfo(DATASHARING_PAYLOADPOOL, "Change added to shared history" diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index dc8cc3802b2..731b9edff78 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -66,10 +66,6 @@ StatefulReader::~StatefulReader() { delete(writer); } - for (WriterProxy* writer : matched_datasharing_writers_) - { - delete(writer); - } for (WriterProxy* writer : matched_writers_pool_) { delete(writer); @@ -91,7 +87,6 @@ StatefulReader::StatefulReader( , proxy_changes_config_(resource_limits_from_history(hist->m_att, 0)) , disable_positive_acks_(att.disable_positive_acks) , is_alive_(true) - , matched_datasharing_writers_(att.matched_writers_allocation) { init(pimpl, att); } @@ -112,7 +107,6 @@ StatefulReader::StatefulReader( , proxy_changes_config_(resource_limits_from_history(hist->m_att, 0)) , disable_positive_acks_(att.disable_positive_acks) , is_alive_(true) - , matched_datasharing_writers_(att.matched_writers_allocation) { init(pimpl, att); } @@ -134,7 +128,6 @@ StatefulReader::StatefulReader( , proxy_changes_config_(resource_limits_from_history(hist->m_att, 0)) , disable_positive_acks_(att.disable_positive_acks) , is_alive_(true) - , matched_datasharing_writers_(att.matched_writers_allocation) { init(pimpl, att); } @@ -163,7 +156,7 @@ bool StatefulReader::matched_writer_add( } bool is_datasharing = is_datasharing_compatible_with(wdata); - bool is_same_process = !is_datasharing && RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid()); + bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid()); for (WriterProxy* it : matched_writers_) { @@ -182,20 +175,6 @@ bool StatefulReader::matched_writer_add( } } - for (WriterProxy* it : matched_datasharing_writers_) - { - if (it->guid() == wdata.guid()) - { - logInfo(RTPS_READER, "Attempting to add existing datasharing writer, updating information"); - it->update(wdata); - for (const Locator_t& locator : it->remote_locators_shrinked()) - { - getRTPSParticipant()->createSenderResources(locator); - } - return false; - } - } - // Get a writer proxy from the inactive pool (or create a new one if necessary and allowed) WriterProxy* wp = nullptr; if (matched_writers_pool_.empty()) @@ -223,7 +202,7 @@ bool StatefulReader::matched_writer_add( add_persistence_guid(wdata.guid(), wdata.persistence_guid()); initial_sequence = get_last_notified(wdata.guid()); - wp->start(wdata, initial_sequence); + wp->start(wdata, initial_sequence, is_datasharing); if (!is_same_process) { @@ -238,7 +217,7 @@ bool StatefulReader::matched_writer_add( if (datasharing_listener_->add_datasharing_writer(wdata.guid(), m_att.durabilityKind == VOLATILE)) { - matched_datasharing_writers_.push_back(wp); + matched_writers_.push_back(wp); logInfo(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId << " with data sharing"); } @@ -252,7 +231,8 @@ bool StatefulReader::matched_writer_add( return false; } - if (m_att.durabilityKind != VOLATILE) + // Intraprocess manages durability itself + if (!is_same_process && m_att.durabilityKind != VOLATILE) { // simulate a notification to force reading of transient changes datasharing_listener_->notify(false); @@ -312,46 +292,31 @@ bool StatefulReader::matched_writer_remove( } } - for (ResourceLimitedVector::iterator it = matched_datasharing_writers_.begin(); - it != matched_datasharing_writers_.end(); + for (ResourceLimitedVector::iterator it = matched_writers_.begin(); + it != matched_writers_.end(); ++it) { if ((*it)->guid() == writer_guid) { - logInfo(RTPS_READER, - "Data sharing writer proxy " << writer_guid << " removed from " << m_guid.entityId); + logInfo(RTPS_READER, "Writer proxy " << writer_guid << " removed from " << m_guid.entityId); wproxy = *it; - matched_datasharing_writers_.erase(it); - - // If it is in the list of datasharing, it must be in the listener - bool removed_from_listener = datasharing_listener_->remove_datasharing_writer(writer_guid); - assert(removed_from_listener); - (void)removed_from_listener; - remove_changes_from(writer_guid, true); + matched_writers_.erase(it); break; } } - if (wproxy == nullptr) - { - for (ResourceLimitedVector::iterator it = matched_writers_.begin(); - it != matched_writers_.end(); - ++it) - { - if ((*it)->guid() == writer_guid) - { - logInfo(RTPS_READER, "Writer proxy " << writer_guid << " removed from " << m_guid.entityId); - wproxy = *it; - matched_writers_.erase(it); - - break; - } - } - } if (wproxy != nullptr) { remove_persistence_guid(wproxy->guid(), wproxy->persistence_guid(), removed_by_lease); + if (wproxy->is_datasharing_writer()) + { + // If it is datasharing, it must be in the listener + bool removed_from_listener = datasharing_listener_->remove_datasharing_writer(writer_guid); + assert(removed_from_listener); + (void)removed_from_listener; + remove_changes_from(writer_guid, true); + } wproxy->stop(); matched_writers_pool_.push_back(wproxy); } @@ -378,14 +343,6 @@ bool StatefulReader::matched_writer_is_matched( return true; } } - - for (WriterProxy* it : matched_datasharing_writers_) - { - if (it->guid() == writer_guid && it->is_alive()) - { - return true; - } - } } return false; @@ -433,14 +390,6 @@ bool StatefulReader::findWriterProxy( return true; } } - for (WriterProxy* it : matched_datasharing_writers_) - { - if (it->guid() == writerGUID && it->is_alive()) - { - *WP = it; - return true; - } - } return false; } @@ -500,9 +449,22 @@ bool StatefulReader::processDataMsg( // Ask payload pool to copy the payload IPayloadPool* payload_owner = change->payload_owner(); - ReaderPool* datasharing_pool = dynamic_cast(payload_owner); - if (datasharing_pool) + + if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched(change->writerGUID)) { + // We may receive the change from the listener (with owner a ReaderPool) or intraprocess (with owner a WriterPool) + ReaderPool* datasharing_pool = dynamic_cast(payload_owner); + if (!datasharing_pool) + { + datasharing_pool = datasharing_listener_->get_pool_for_writer(change->writerGUID).get(); + } + if (!datasharing_pool) + { + logWarning(RTPS_MSG_IN, IDSTRING "Problem copying DataSharing CacheChange from writer " + << change->writerGUID); + change_pool_->release_cache(change_to_add); + return false; + } datasharing_pool->get_payload(change->serializedPayload, payload_owner, *change_to_add); } else if (payload_pool_->get_payload(change->serializedPayload, payload_owner, *change_to_add)) @@ -756,15 +718,6 @@ bool StatefulReader::acceptMsgFrom( } } - for (WriterProxy* it : matched_datasharing_writers_) - { - if (it->guid() == writerId && it->is_alive()) - { - *wp = it; - return true; - } - } - // Check if it's a framework's one. In this case, m_acceptMessagesFromUnkownWriters // is an enabler for the trusted entity comparison if (m_acceptMessagesFromUnkownWriters @@ -853,15 +806,6 @@ bool StatefulReader::change_received( // initialized using this SequenceNumber_t. Note that on a SERVER the own DATA(p) may be in any // position within the WriterHistory preventing effective data exchange. update_last_notified(a_change->writerGUID, SequenceNumber_t(0, 1)); - - ReaderPool* datasharing_pool = dynamic_cast(a_change->payload_owner()); - if (datasharing_pool) - { - // Change was added to the history. May need to update datasharing ACK timestamp - // because we can receive changes in a different order (due to processing of writers or late-joiners) - datasharing_listener_->change_added_with_timestamp(a_change->sourceTimestamp.to_ns()); - } - if (getListener() != nullptr) { getListener()->onNewCacheChangeAdded((RTPSReader*)this, a_change); @@ -900,14 +844,6 @@ bool StatefulReader::change_received( ret = prox->received_change_set(a_change->sequenceNumber); } - ReaderPool* datasharing_pool = dynamic_cast(a_change->payload_owner()); - if (datasharing_pool && ret) - { - // Change was added to the history. May need to update datasharing ACK timestamp - // because we can receive changes in a different order (due to processing of writers or late-joiners) - datasharing_listener_->change_added_with_timestamp(a_change->sourceTimestamp.to_ns()); - } - NotifyChanges(prox); return ret; @@ -997,34 +933,13 @@ bool StatefulReader::nextUntakenCache( { if (this->matched_writer_lookup((*it)->writerGUID, &wp)) { - if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched((*it)->writerGUID)) - { - // Lock the payload. The lock will NOT be freed for the returned change - DataSharingPayloadPool::shared_mutex((*it)->serializedPayload.data).lock_sharable(); - - //Check if the payload is dirty - if (!DataSharingPayloadPool::check_sequence_number( - (*it)->serializedPayload.data, (*it)->sequenceNumber)) - { - // Unlock, remove and continue - DataSharingPayloadPool::shared_mutex((*it)->serializedPayload.data).unlock_sharable(); - logWarning(RTPS_READER, - "Removing change " << (*it)->sequenceNumber << " from " << (*it)->writerGUID << - " because is overidden"); - it = mp_history->remove_change(it); - continue; - } - } - else + // TODO Revisar la comprobacion + SequenceNumber_t seq; + seq = wp->available_changes_max(); + if (seq < (*it)->sequenceNumber) { - // TODO Revisar la comprobacion - SequenceNumber_t seq; - seq = wp->available_changes_max(); - if (seq < (*it)->sequenceNumber) - { - ++it; - continue; - } + ++it; + continue; } takeok = true; } @@ -1078,33 +993,12 @@ bool StatefulReader::nextUnreadCache( if (matched_writer_lookup((*it)->writerGUID, &wp)) { - if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched((*it)->writerGUID)) - { - // Lock the payload. The lock will NOT be freed for the returned change - DataSharingPayloadPool::shared_mutex((*it)->serializedPayload.data).lock_sharable(); - - //Check if the payload is dirty - if (!DataSharingPayloadPool::check_sequence_number( - (*it)->serializedPayload.data, (*it)->sequenceNumber)) - { - // Unlock, remove and continue - DataSharingPayloadPool::shared_mutex((*it)->serializedPayload.data).unlock_sharable(); - logWarning(RTPS_READER, - "Removing change " << (*it)->sequenceNumber << " from " << (*it)->writerGUID << - " because is overidden"); - it = mp_history->remove_change(it); - continue; - } - } - else + SequenceNumber_t seq; + seq = wp->available_changes_max(); + if (seq < (*it)->sequenceNumber) { - SequenceNumber_t seq; - seq = wp->available_changes_max(); - if (seq < (*it)->sequenceNumber) - { - ++it; - continue; - } + ++it; + continue; } readok = true; } @@ -1165,13 +1059,6 @@ bool StatefulReader::isInCleanState() return false; } } - for (WriterProxy* wp : matched_datasharing_writers_) - { - if (wp->number_of_changes_from_writer() != 0) - { - return false; - } - } } return true; @@ -1195,25 +1082,6 @@ bool StatefulReader::begin_sample_access_nts( if (seq < change->sequenceNumber) { is_future_change = true; - return true; - } - - if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched(writer_guid)) - { - // Lock the payload. Will remain locked until end_sample_access_nts is called - DataSharingPayloadPool::shared_mutex(change->serializedPayload.data).lock_sharable(); - - //Check if the payload is dirty - if (!DataSharingPayloadPool::check_sequence_number( - change->serializedPayload.data, change->sequenceNumber)) - { - // Unlock and return false - DataSharingPayloadPool::shared_mutex(change->serializedPayload.data).unlock_sharable(); - logWarning(RTPS_READER, - "Removing change " << change->sequenceNumber << " from " << writer_guid << - " because is overidden"); - return false; - } } return true; @@ -1245,36 +1113,22 @@ void StatefulReader::change_read_by_user( } } - std::unique_lock lock(mp_mutex); - // If not datasharing, we are done - if (!is_datasharing_compatible_ || !datasharing_listener_->writer_is_matched(writer->guid())) + if (!writer->is_datasharing_writer()) { return; } - // Unlock the payload - DataSharingPayloadPool::shared_mutex(change->serializedPayload.data).unlock_sharable(); - if (mark_as_read) { // This may not be the change read with highest SN, // need to find largest SN to ACK std::vector::iterator last_read_from_writer; - bool first_not_read_found = false; for (std::vector::iterator it = mp_history->changesBegin(); it != mp_history->changesEnd(); ++it) { if (!(*it)->isRead) { - // First update the last ACK timestamp in the shared memory - if (!first_not_read_found) - { - datasharing_listener_->change_removed_with_timestamp((*it)->sourceTimestamp.to_ns()); - first_not_read_found = true; - } - - // Then check if we have to send the ACk to the writer if ((*it)->writerGUID == writer->guid()) { if ((*it)->sequenceNumber < change->sequenceNumber) @@ -1282,27 +1136,16 @@ void StatefulReader::change_read_by_user( //There are earlier changes not read yet. Do not send ACK. return; } - acknack_count_++; - RTPSMessageGroup group(getRTPSParticipant(), this, *writer); SequenceNumberSet_t sns((*it)->sequenceNumber); - group.add_acknack(sns, acknack_count_, false); - logInfo(RTPS_READER, "Sending datasharing ACK for SN " << (*it)->sequenceNumber - 1); + send_acknack(writer, sns, *writer, false); return; } } } // Must ACK all in the writer - if (!first_not_read_found) - { - datasharing_listener_->change_removed_with_timestamp( - (*mp_history->changesRbegin())->sourceTimestamp.to_ns() + 1); - } - acknack_count_++; - RTPSMessageGroup group(getRTPSParticipant(), this, *writer); SequenceNumberSet_t sns(writer->available_changes_max() + 1); - group.add_acknack(sns, acknack_count_, false); - logInfo(RTPS_READER, "Sending datasharing ACK for last SN " << writer->available_changes_max()); + send_acknack(writer, sns, *writer, false); } } @@ -1330,7 +1173,7 @@ void StatefulReader::send_acknack( RTPSMessageGroup group(getRTPSParticipant(), this, sender); group.add_acknack(sns, acknack_count_, is_final); } - else if (!is_datasharing_compatible_ || !datasharing_listener_->writer_is_matched(writer->guid())) + else { GUID_t reader_guid = m_guid; uint32_t acknack_count = acknack_count_; @@ -1359,6 +1202,13 @@ void StatefulReader::send_acknack( return; } + // ACKNACK for datasharing writers is done for changes with status READ, not on reception + // This is handled in change_read_by_user + if (writer->is_datasharing_writer()) + { + return; + } + SequenceNumberSet_t missing_changes = writer->missing_changes(); try diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 00a2b409c74..b903eda7e1c 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -29,6 +29,8 @@ #include #include +#include "rtps/RTPSDomainImpl.hpp" + #include #include @@ -99,7 +101,16 @@ bool StatelessReader::matched_writer_add( } } - if (is_datasharing_compatible_with(wdata)) + bool is_datasharing = is_datasharing_compatible_with(wdata); + bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid()); + + RemoteWriterInfo_t info; + info.guid = wdata.guid(); + info.persistence_guid = wdata.persistence_guid(); + info.has_manual_topic_liveliness = (MANUAL_BY_TOPIC_LIVELINESS_QOS == wdata.m_qos.m_liveliness.kind); + info.is_datasharing = is_datasharing; + + if (is_datasharing) { if (datasharing_listener_->add_datasharing_writer(wdata.guid(), m_att.durabilityKind == VOLATILE)) @@ -115,27 +126,20 @@ bool StatelessReader::matched_writer_add( return false; } - if (m_att.durabilityKind != VOLATILE) - { - // simulate a notification to force reading of transient changes - datasharing_listener_->notify(false); - } } - else + + if (matched_writers_.emplace_back(info) == nullptr) { - RemoteWriterInfo_t info; - info.guid = wdata.guid(); - info.persistence_guid = wdata.persistence_guid(); - info.has_manual_topic_liveliness = (MANUAL_BY_TOPIC_LIVELINESS_QOS == wdata.m_qos.m_liveliness.kind); - if (matched_writers_.emplace_back(info) == nullptr) + logWarning(RTPS_READER, "No space to add writer " << wdata.guid() << " to reader " << m_guid); + if (is_datasharing) { - logWarning(RTPS_READER, "No space to add writer " << wdata.guid() << " to reader " << m_guid); - return false; + datasharing_listener_->remove_datasharing_writer(wdata.guid()); } - - add_persistence_guid(info.guid, info.persistence_guid); - logInfo(RTPS_READER, "Writer " << wdata.guid() << " added to reader " << m_guid); + return false; } + logInfo(RTPS_READER, "Writer " << wdata.guid() << " added to reader " << m_guid); + + add_persistence_guid(info.guid, info.persistence_guid); m_acceptMessagesFromUnkownWriters = false; @@ -155,6 +159,14 @@ bool StatelessReader::matched_writer_add( } } + // Intraprocess manages durability itself + if (is_datasharing && !is_same_process && m_att.durabilityKind != VOLATILE) + { + // simulate a notification to force reading of transient changes + // this has to be done after the writer is added to the matched_writers or the processing may fail + datasharing_listener_->notify(false); + } + return true; } @@ -184,35 +196,26 @@ bool StatelessReader::matched_writer_remove( } } - bool found = false; - if (is_datasharing_compatible_) + ResourceLimitedVector::iterator it; + for (it = matched_writers_.begin(); it != matched_writers_.end(); ++it) { - if (datasharing_listener_->remove_datasharing_writer(writer_guid)) + if (it->guid == writer_guid) { - found = true; - logInfo(RTPS_READER, "Data sharing writer " << writer_guid << " removed from " << m_guid.entityId); - remove_changes_from(writer_guid, true); - } - } + logInfo(RTPS_READER, "Writer " << writer_guid << " removed from " << m_guid); - if (!found) - { - ResourceLimitedVector::iterator it; - for (it = matched_writers_.begin(); it != matched_writers_.end(); ++it) - { - if (it->guid == writer_guid) + if (it->is_datasharing && datasharing_listener_->remove_datasharing_writer(writer_guid)) { - logInfo(RTPS_READER, "Writer " << writer_guid << " removed from " << m_guid); - found = true; - - remove_persistence_guid(it->guid, it->persistence_guid, removed_by_lease); - matched_writers_.erase(it); - break; + logInfo(RTPS_READER, "Data sharing writer " << writer_guid << " removed from " << m_guid.entityId); + remove_changes_from(writer_guid, true); } + + remove_persistence_guid(it->guid, it->persistence_guid, removed_by_lease); + matched_writers_.erase(it); + return true; } } - return found; + return false; } bool StatelessReader::matched_writer_is_matched( @@ -228,11 +231,6 @@ bool StatelessReader::matched_writer_is_matched( return true; } - if (is_datasharing_compatible_) - { - return datasharing_listener_->writer_is_matched(writer_guid); - } - return false; } @@ -249,14 +247,6 @@ bool StatelessReader::change_received( update_last_notified(change->writerGUID, change->sequenceNumber); ++total_unread_; - ReaderPool* datasharing_pool = dynamic_cast(change->payload_owner()); - if (datasharing_pool) - { - // Change was added to the history. May need to update datasharing ACK timestamp - // because we can receive changes in a different order (due to processing of writers or late-joiners) - datasharing_listener_->change_added_with_timestamp(change->sourceTimestamp.to_ns()); - } - if (getListener() != nullptr) { getListener()->onNewCacheChangeAdded(this, change); @@ -304,38 +294,7 @@ bool StatelessReader::nextUntakenCache( WriterProxy** /*wpout*/) { std::lock_guard guard(mp_mutex); - bool found = false; - - while (mp_history->get_min_change(change)) - { - if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched((*change)->writerGUID)) - { - // Lock the payload. The lock will NOT be freed for the returned change - DataSharingPayloadPool::shared_mutex((*change)->serializedPayload.data).lock_sharable(); - - //Check if the payload is dirty - if (DataSharingPayloadPool::check_sequence_number( - (*change)->serializedPayload.data, (*change)->sequenceNumber)) - { - found = true; - break; - } - - // Unlock, remove and continue - DataSharingPayloadPool::shared_mutex((*change)->serializedPayload.data).unlock_sharable(); - logWarning(RTPS_READER, - "Removing change " << (*change)->sequenceNumber << " from " << (*change)->writerGUID << - " because is overidden"); - mp_history->remove_change(*change); - } - else - { - found = true; - break; - } - } - - return found; + return mp_history->get_min_change(change); } bool StatelessReader::nextUnreadCache( @@ -353,31 +312,8 @@ bool StatelessReader::nextUnreadCache( continue; } - if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched((*it)->writerGUID)) - { - // Lock the payload. The lock will NOT be freed for the returned change - DataSharingPayloadPool::shared_mutex((*it)->serializedPayload.data).lock_sharable(); - - //Check if the payload is dirty - if (DataSharingPayloadPool::check_sequence_number( - (*it)->serializedPayload.data, (*it)->sequenceNumber)) - { - found = true; - break; - } - - // Unlock, remove and continue - DataSharingPayloadPool::shared_mutex((*it)->serializedPayload.data).unlock_sharable(); - logWarning(RTPS_READER, - "Removing change " << (*it)->sequenceNumber << " from " << (*it)->writerGUID << - " because is overidden"); - it = mp_history->remove_change(it); - } - else - { - found = true; - break; - } + found = true; + break; } if (found) @@ -408,31 +344,11 @@ bool StatelessReader::change_removed_by_history( } bool StatelessReader::begin_sample_access_nts( - CacheChange_t* change, + CacheChange_t* /*change*/, WriterProxy*& /*wp*/, bool& is_future_change) { - const GUID_t& writer_guid = change->writerGUID; is_future_change = false; - - if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched(writer_guid)) - { - // Lock the payload. Will remain locked until end_sample_access_nts is called - DataSharingPayloadPool::shared_mutex(change->serializedPayload.data).lock_sharable(); - - //Check if the payload is dirty - if (!DataSharingPayloadPool::check_sequence_number( - change->serializedPayload.data, change->sequenceNumber)) - { - // Unlock and return false - DataSharingPayloadPool::shared_mutex(change->serializedPayload.data).unlock_sharable(); - logWarning(RTPS_READER, - "Removing change " << change->sequenceNumber << " from " << writer_guid << - " because is overidden"); - return false; - } - } - return true; } @@ -459,37 +375,6 @@ void StatelessReader::change_read_by_user( } } - std::unique_lock lock(mp_mutex); - - // If not datasharing, we are done - if (!is_datasharing_compatible_ || !datasharing_listener_->writer_is_matched(change->writerGUID)) - { - return; - } - - // Unlock the payload - DataSharingPayloadPool::shared_mutex(change->serializedPayload.data).unlock_sharable(); - - if (mark_as_read) - { - // This may not be the change read with highest SN, - // need to find largest SN to ACK - std::vector::iterator last_read_from_writer; - for (std::vector::iterator it = mp_history->changesBegin(); - it != mp_history->changesEnd(); ++it) - { - if (!(*it)->isRead) - { - // Update the last ACK timestamp in the shared memory - datasharing_listener_->change_removed_with_timestamp((*it)->sourceTimestamp.to_ns()); - return; - } - } - - // Must ACK all in the writer - datasharing_listener_->change_removed_with_timestamp( - (*mp_history->changesRbegin())->sourceTimestamp.to_ns() + 1); - } } bool StatelessReader::processDataMsg( @@ -518,9 +403,29 @@ bool StatelessReader::processDataMsg( // Ask payload pool to copy the payload IPayloadPool* payload_owner = change->payload_owner(); - ReaderPool* datasharing_pool = dynamic_cast(payload_owner); - if (datasharing_pool) + + bool is_datasharing = std::any_of(matched_writers_.begin(), matched_writers_.end(), + [&change](const RemoteWriterInfo_t& writer) + { + return (writer.guid == change->writerGUID) && (writer.is_datasharing); + }); + + if (is_datasharing) { + //We may receive the change from the listener (with owner a ReaderPool) or intraprocess (with owner a WriterPool) + ReaderPool* datasharing_pool = dynamic_cast(payload_owner); + if (!datasharing_pool) + { + datasharing_pool = datasharing_listener_->get_pool_for_writer(change->writerGUID).get(); + } + if (!datasharing_pool) + { + logWarning(RTPS_MSG_IN, IDSTRING "Problem copying DataSharing CacheChange from writer " + << change->writerGUID); + change_pool_->release_cache(change_to_add); + return false; + } + datasharing_pool->get_payload(change->serializedPayload, payload_owner, *change_to_add); } else if (payload_pool_->get_payload(change->serializedPayload, payload_owner, *change_to_add)) @@ -565,6 +470,8 @@ bool StatelessReader::processDataFragMsg( { if (writer.guid == writer_guid) { + // Datasharing communication will never send fragments + assert(!writer.is_datasharing); assert_writer_liveliness(writer_guid); // Check if CacheChange was received. @@ -694,11 +601,6 @@ bool StatelessReader::acceptMsgFrom( } } - if (is_datasharing_compatible_ && datasharing_listener_->writer_is_matched(writerId)) - { - return true; - } - return std::any_of(matched_writers_.begin(), matched_writers_.end(), [&writerId](const RemoteWriterInfo_t& writer) { diff --git a/src/cpp/rtps/reader/WriterProxy.cpp b/src/cpp/rtps/reader/WriterProxy.cpp index 0b8de864635..74030bfe2a1 100644 --- a/src/cpp/rtps/reader/WriterProxy.cpp +++ b/src/cpp/rtps/reader/WriterProxy.cpp @@ -87,6 +87,7 @@ WriterProxy::WriterProxy( , ownership_strength_(0) , liveliness_kind_(AUTOMATIC_LIVELINESS_QOS) , locators_entry_(loc_alloc.max_unicast_locators, loc_alloc.max_multicast_locators) + , is_datasharing_writer_(false) { //Create Events ResourceEvent& event_manager = reader_->getRTPSParticipant()->getEventResource(); @@ -111,6 +112,14 @@ WriterProxy::WriterProxy( void WriterProxy::start( const WriterProxyData& attributes, const SequenceNumber_t& initial_sequence) +{ + start(attributes, initial_sequence, false); +} + +void WriterProxy::start( + const WriterProxyData& attributes, + const SequenceNumber_t& initial_sequence, + bool is_datasharing) { #ifdef SHOULD_DEBUG_LINUX assert(get_mutex_owner() == get_thread_id()); @@ -129,7 +138,7 @@ void WriterProxy::start( liveliness_kind_ = attributes.m_qos.m_liveliness.kind; locators_entry_.unicast = attributes.remote_locators().unicast; locators_entry_.multicast = attributes.remote_locators().multicast; - + is_datasharing_writer_ = is_datasharing; initial_acknack_->restart_timer(); loaded_from_storage(initial_sequence); } diff --git a/src/cpp/rtps/reader/WriterProxy.h b/src/cpp/rtps/reader/WriterProxy.h index 3669f17be42..b29c22b3ad1 100644 --- a/src/cpp/rtps/reader/WriterProxy.h +++ b/src/cpp/rtps/reader/WriterProxy.h @@ -80,6 +80,17 @@ class WriterProxy : public RTPSMessageSenderInterface const WriterProxyData& attributes, const SequenceNumber_t& initial_sequence); + /** + * Activate this proxy associating it to a remote writer. + * @param attributes WriterProxyData of the writer for which to keep state. + * @param initial_sequence Sequence number of last acknowledged change. + * @param is_datasharing Whether the writer is datasharing with us or not. + */ + void start( + const WriterProxyData& attributes, + const SequenceNumber_t& initial_sequence, + bool is_datasharing); + /** * Update information on the remote writer. * @param attributes WriterProxyData with updated information of the writer. @@ -321,6 +332,11 @@ class WriterProxy : public RTPSMessageSenderInterface return is_on_same_process_; } + bool is_datasharing_writer() const + { + return is_datasharing_writer_; + } + private: /** @@ -352,7 +368,7 @@ class WriterProxy : public RTPSMessageSenderInterface bool is_alive_; using pool_allocator_t = - foonathan::memory::memory_pool; + foonathan::memory::memory_pool; //! Memory pool allocator for changes_received_ pool_allocator_t changes_pool_; @@ -378,6 +394,8 @@ class WriterProxy : public RTPSMessageSenderInterface GUID_t persistence_guid_; //! Taken from proxy data LocatorSelectorEntry locators_entry_; + //! Is the writer datasharing + bool is_datasharing_writer_; using ChangeIterator = decltype(changes_received_)::iterator; @@ -385,12 +403,12 @@ class WriterProxy : public RTPSMessageSenderInterface int get_mutex_owner() const; int get_thread_id() const; -#endif +#endif // if !defined(NDEBUG) && defined(FASTRTPS_SOURCE) && defined(__linux__) }; } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ -#endif +#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #endif /* FASTRTPS_RTPS_READER_WRITERPROXY_H_ */ diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 0652712fa2f..83f613e05db 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -1552,19 +1552,19 @@ bool StatefulWriter::matched_reader_add( rp->start(rdata, is_datasharing_compatible_with(rdata)); locator_selector_.add_entry(rp->locator_selector_entry()); - if (rp->is_datasharing_reader()) + if (rp->is_local_reader()) { - matched_datasharing_readers_.push_back(rp); + matched_local_readers_.push_back(rp); logInfo(RTPS_WRITER, "Adding reader " << rdata.guid() << " to " << this->m_guid.entityId - << " as data sharing"); + << " as local reader"); } else { - if (rp->is_local_reader()) + if (rp->is_datasharing_reader()) { - matched_local_readers_.push_back(rp); + matched_datasharing_readers_.push_back(rp); logInfo(RTPS_WRITER, "Adding reader " << rdata.guid() << " to " << this->m_guid.entityId - << " as local reader"); + << " as data sharing"); } else { @@ -2575,19 +2575,6 @@ const fastdds::rtps::IReaderDataFilter* StatefulWriter::reader_data_filter() con return reader_data_filter_; } -bool StatefulWriter::is_datasharing_payload_reusable( - const Time_t& source_timestamp) const -{ - for (const ReaderProxy* reader : matched_datasharing_readers_) - { - if (reader->datasharing_notifier()->ack_timestamp() <= source_timestamp.to_ns()) - { - return false; - } - } - return true; -} - } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 2eac1920276..73e7f75ad3d 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -919,8 +919,8 @@ bool StatelessWriter::matched_reader_add( locator_selector_.add_entry(new_reader->locator_selector_entry()); - // Datasharing readers handle transient history themselves - if (!new_reader->is_datasharing_reader() && + // Remote datasharing readers handle transient history themselves + if ((!new_reader->is_datasharing_reader() || new_reader->is_local_reader()) && mp_history->getHistorySize() > 0 && data.m_qos.m_durability.kind >= TRANSIENT_LOCAL_DURABILITY_QOS) { @@ -934,17 +934,17 @@ bool StatelessWriter::matched_reader_add( mp_RTPSParticipant->async_thread().wake_up(this); } - if (new_reader->is_datasharing_reader()) + if (new_reader->is_local_reader()) { - matched_datasharing_readers_.push_back(std::move(new_reader)); + matched_local_readers_.push_back(std::move(new_reader)); logInfo(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId - << " as data sharing"); + << " as local reader"); } - else if (new_reader->is_local_reader()) + else if (new_reader->is_datasharing_reader()) { - matched_local_readers_.push_back(std::move(new_reader)); + matched_datasharing_readers_.push_back(std::move(new_reader)); logInfo(RTPS_WRITER, "Adding reader " << data.guid() << " to " << this->m_guid.entityId - << " as local reader"); + << " as data sharing"); } else { @@ -1088,19 +1088,6 @@ bool StatelessWriter::send( fixed_locators_.end()), max_blocking_time_point); } -bool StatelessWriter::is_datasharing_payload_reusable( - const Time_t& source_timestamp) const -{ - for (const std::unique_ptr& reader : matched_datasharing_readers_) - { - if (reader->datasharing_notifier()->ack_timestamp() <= source_timestamp.to_ns()) - { - return false; - } - } - return true; -} - } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp index d179e79a769..6ae4e881c87 100644 --- a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp @@ -454,6 +454,13 @@ TEST_P(PubSubHistory, PubSubAsReliableMultithreadKeepLast1) ASSERT_TRUE(reader.isInitialized()); + if (enable_datasharing) + { + // on datasharing we need to give time to the reader to process the data + // before reusing it + writer.resource_limits_extra_samples(200); + } + writer.history_depth(1).init(); ASSERT_TRUE(writer.isInitialized()); diff --git a/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp b/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp index 5390c3cd3a3..2dd3422c21b 100644 --- a/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDataSharing.cpp @@ -117,7 +117,7 @@ TEST(DDSDataSharing, TransientReader) TEST(DDSDataSharing, BestEffortDirtyPayloads) { // The writer's pool is smaller than the reader history. - // The number of samples is larger than the pool size, so some payloads get rused + // The number of samples is larger than the pool size, so some payloads get reused // leaving dirty payloads in the reader PubSubReader read_reader(TEST_TOPIC_NAME, false); PubSubWriter writer(TEST_TOPIC_NAME); @@ -148,21 +148,18 @@ TEST(DDSDataSharing, BestEffortDirtyPayloads) read_reader.wait_discovery(); std::list data = default_fixed_sized_data_generator(writer_sent_data); - std::list data_in_history; + std::list valid_data; auto data_it = data.begin(); std::advance(data_it, writer_sent_data - writer_history_depth - 1); - std::copy(data_it, data.end(), std::back_inserter(data_in_history)); + std::copy(data_it, data.end(), std::back_inserter(valid_data)); // Send the data to fill the history and overwrite old changes - // The reader will receive and process all changes so that the writer can reuse them, - // but will keep them in the history. - read_reader.startReception(data); writer.send(data, 100); ASSERT_TRUE(data.empty()); - read_reader.block_for_all(); - // Doing a second read on the same history, the application will see only the last samples - read_reader.check_history_content(data_in_history); + // The reader has overridden payloads in the history. Only the valid ones are returned to the user + read_reader.startReception(valid_data); + read_reader.block_for_all(); } TEST(DDSDataSharing, ReliableDirtyPayloads) @@ -191,7 +188,7 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) read_reader.history_depth(writer_sent_data) .add_user_transport_to_pparams(testTransport) .datasharing_on("Unused. change when ready") - .reliability(BEST_EFFORT_RELIABILITY_QOS).init(); + .reliability(RELIABLE_RELIABILITY_QOS).init(); ASSERT_TRUE(read_reader.isInitialized()); @@ -199,10 +196,10 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) read_reader.wait_discovery(); std::list data = default_fixed_sized_data_generator(writer_sent_data); - std::list data_in_history; + std::list valid_data; auto data_it = data.begin(); std::advance(data_it, writer_sent_data - writer_history_depth - 1); - std::copy(data_it, data.end(), std::back_inserter(data_in_history)); + std::copy(data_it, data.end(), std::back_inserter(valid_data)); // Send the data to fill the history and overwrite old changes // The reader will receive and process all changes so that the writer can reuse them, @@ -213,7 +210,14 @@ TEST(DDSDataSharing, ReliableDirtyPayloads) read_reader.block_for_all(); // Doing a second read on the same history, the application will see only the last samples - read_reader.check_history_content(data_in_history); + while (!valid_data.empty()) + { + FixedSized value; + ASSERT_TRUE(read_reader.take_first_data(&value)); + ASSERT_EQ(valid_data.front(), value); + valid_data.pop_front(); + } + ASSERT_TRUE(valid_data.empty()); } TEST(DDSDataSharing, DataSharingWriter_DifferentDomainReaders) @@ -493,4 +497,4 @@ TEST(DDSDataSharing, DataSharingPoolError) writer_auto.send(data); ASSERT_TRUE(data.empty()); reader.block_for_all(); -} \ No newline at end of file +} diff --git a/test/mock/rtps/DataSharingPayloadPool/rtps/DataSharing/DataSharingPayloadPool.hpp b/test/mock/rtps/DataSharingPayloadPool/rtps/DataSharing/DataSharingPayloadPool.hpp index 37de9453293..e336cd4ed69 100644 --- a/test/mock/rtps/DataSharingPayloadPool/rtps/DataSharing/DataSharingPayloadPool.hpp +++ b/test/mock/rtps/DataSharingPayloadPool/rtps/DataSharing/DataSharingPayloadPool.hpp @@ -153,12 +153,10 @@ class DataSharingPayloadPool : public IPayloadPool return true; } - template - bool wait_until( - const std::chrono::time_point& /*max_blocking_time*/, - const ConditionFunctor& condition) + bool is_sample_valid( + const CacheChange_t& /*change*/) const { - return condition(); + return true; } protected: diff --git a/test/mock/rtps/DataSharingPayloadPool/rtps/DataSharing/ReaderPool.hpp b/test/mock/rtps/DataSharingPayloadPool/rtps/DataSharing/ReaderPool.hpp new file mode 100644 index 00000000000..4577e99a36e --- /dev/null +++ b/test/mock/rtps/DataSharingPayloadPool/rtps/DataSharing/ReaderPool.hpp @@ -0,0 +1,40 @@ +// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file ReaderPool.hpp + */ + +#ifndef RTPS_DATASHARING_READERPOOL_HPP +#define RTPS_DATASHARING_READERPOOL_HPP + +#include + +#include + +namespace eprosima { +namespace fastrtps { +namespace rtps { + +class ReaderPool : public DataSharingPayloadPool +{ + + +}; + +} // namespace rtps +} // namespace fastrtps +} // namespace eprosima + +#endif // RTPS_DATASHARING_DATASHARINGPAYLOADPOOLIMPL_READERPOOL_HPP diff --git a/test/unittest/dds/subscriber/DataReaderTests.cpp b/test/unittest/dds/subscriber/DataReaderTests.cpp index 7995eff048d..2392878d4b5 100644 --- a/test/unittest/dds/subscriber/DataReaderTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderTests.cpp @@ -1321,6 +1321,145 @@ TEST_F(DataReaderTests, read_unread) } } +/* + * This type fails deserialization on odd samples + */ +class FailingFooTypeSupport : public FooTypeSupport +{ + +public: + + FailingFooTypeSupport() + : FooTypeSupport() + { + } + + bool deserialize( + fastrtps::rtps::SerializedPayload_t* payload, + void* data) override + { + //Convert DATA to pointer of your type + FooType* p_type = static_cast(data); + + bool ret_val = FooTypeSupport::deserialize(payload, p_type); + if (p_type->message()[0] % 2) + { + return false; + } + return ret_val; + } + +}; + +/* + * This test checks that the behaviour of the read/take calls when deserialization fails. + */ +TEST_F(DataReaderTests, Deserialization_errors) +{ + type_.reset(new FailingFooTypeSupport()); + + static const Duration_t time_to_wait(0, 100 * 1000 * 1000); + static constexpr int32_t num_samples = 10; + + const ReturnCode_t& ok_code = ReturnCode_t::RETCODE_OK; + const ReturnCode_t& no_data_code = ReturnCode_t::RETCODE_NO_DATA; + + DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; + writer_qos.history().kind = KEEP_LAST_HISTORY_QOS; + writer_qos.history().depth = num_samples; + writer_qos.publish_mode().kind = SYNCHRONOUS_PUBLISH_MODE; + writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS; + + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS; + reader_qos.history().kind = KEEP_ALL_HISTORY_QOS; + reader_qos.resource_limits().max_instances = 1; + reader_qos.resource_limits().max_samples_per_instance = num_samples; + reader_qos.resource_limits().max_samples = 3 * num_samples; + + create_instance_handles(); + create_entities(nullptr, reader_qos, SUBSCRIBER_QOS_DEFAULT, writer_qos); + + FooType data; + data.index(1); + data.message()[1] = '\0'; + + // Check deserialization errors without loans + { + // Send a bunch of samples + for (char i = 0; i < num_samples; ++i) + { + data.message()[0] = i + '0'; + EXPECT_EQ(ok_code, data_writer_->write(&data, handle_ok_)); + } + + // There are unread samples, so wait_for_unread should be ok + EXPECT_TRUE(data_reader_->wait_for_unread_message(time_to_wait)); + + { + FooSeq data_seq(num_samples); + SampleInfoSeq info_seq(num_samples); + + // Reader should have 10 samples with the following states (R = read, N = not-read, / = removed from history) + // {N, N, N, N, N, N, N, N, N, N} + + // This should return samples 0, 2, 4, 6, and 8 + EXPECT_EQ(ok_code, data_reader_->read(data_seq, info_seq, num_samples)); + check_collection(data_seq, true, num_samples, num_samples / 2); + check_sample_values(data_seq, "02468"); + } + + { + FooSeq data_seq(num_samples); + SampleInfoSeq info_seq(num_samples); + + // Reader sample states should be + // {R, /, R, /, R, /, R, /, R, /} + + // There are not unread samples in the history + EXPECT_EQ(no_data_code, data_reader_->take(data_seq, info_seq, num_samples, NOT_READ_SAMPLE_STATE)); + } + + { + FooSeq data_seq(num_samples); + SampleInfoSeq info_seq(num_samples); + + // This should return samples 0, 2, 4, 6, and 8 (just for cleaning) + EXPECT_EQ(ok_code, data_reader_->take(data_seq, info_seq, num_samples, READ_SAMPLE_STATE)); + check_collection(data_seq, true, num_samples, num_samples / 2); + check_sample_values(data_seq, "02468"); + } + } + + // Check deserialization errors with loans (loaned samples are not deserialized) + { + // Send a bunch of samples + for (char i = 0; i < num_samples; ++i) + { + data.message()[0] = i + '0'; + EXPECT_EQ(ok_code, data_writer_->write(&data, handle_ok_)); + } + + // There are unread samples, so wait_for_unread should be ok + EXPECT_TRUE(data_reader_->wait_for_unread_message(time_to_wait)); + + { + FooSeq data_seq; + SampleInfoSeq info_seq; + + // Reader should have 10 samples with the following states (R = read, N = not-read, / = removed from history) + // {N, N, N, N, N, N, N, N, N, N} + + // This should return samples 0 to 9 + EXPECT_EQ(ok_code, data_reader_->read(data_seq, info_seq, num_samples)); + check_collection(data_seq, false, num_samples, num_samples); + check_sample_values(data_seq, "0123456789"); + EXPECT_EQ(ok_code, data_reader_->return_loan(data_seq, info_seq)); + } + } + +} + TEST_F(DataReaderTests, TerminateWithoutDestroyingReader) { destroy_entities_ = false;