From 398713b01e9348a684a510b1eb2f8e47c60cee1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Thu, 23 Jan 2020 11:44:06 +0100 Subject: [PATCH 1/9] Refs #7399. Applying coding style. --- src/cpp/publisher/PublisherImpl.cpp | 158 ++++++++++++------------ src/cpp/rtps/writer/StatelessWriter.cpp | 50 ++++---- 2 files changed, 108 insertions(+), 100 deletions(-) diff --git a/src/cpp/publisher/PublisherImpl.cpp b/src/cpp/publisher/PublisherImpl.cpp index 2c0baacd423..ccb0d1cae58 100644 --- a/src/cpp/publisher/PublisherImpl.cpp +++ b/src/cpp/publisher/PublisherImpl.cpp @@ -60,7 +60,7 @@ PublisherImpl::PublisherImpl( , m_history(this, pdatatype->m_typeSize #if HAVE_SECURITY // In future v2 changepool is in writer, and writer set this value to cachechagepool. - + 20 /*SecureDataHeader*/ + 4 + ((2* 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/ + + 20 /*SecureDataHeader*/ + 4 + ((2 * 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/ + 16 + 4 /*SecureDataTag*/ #endif , att.topic.historyQos, att.topic.resourceLimitsQos, att.historyMemoryPolicy) @@ -76,18 +76,18 @@ PublisherImpl::PublisherImpl( , lifespan_duration_us_(m_att.qos.m_lifespan.duration.to_ns() * 1e-3) { deadline_timer_ = new TimedEvent(mp_participant->get_resource_event(), - [&]() -> bool - { - return deadline_missed(); - }, - att.qos.m_deadline.period.to_ns() * 1e-6); + [&]() -> bool + { + return deadline_missed(); + }, + att.qos.m_deadline.period.to_ns() * 1e-6); lifespan_timer_ = new TimedEvent(mp_participant->get_resource_event(), - [&]() -> bool - { - return lifespan_expired(); - }, - m_att.qos.m_lifespan.duration.to_ns() * 1e-6); + [&]() -> bool + { + return lifespan_expired(); + }, + m_att.qos.m_lifespan.duration.to_ns() * 1e-6); } PublisherImpl::~PublisherImpl() @@ -95,7 +95,7 @@ PublisherImpl::~PublisherImpl() delete(lifespan_timer_); delete(deadline_timer_); - if(mp_writer != nullptr) + if (mp_writer != nullptr) { logInfo(PUBLISHER, this->getGuid().entityId << " in topic: " << this->m_att.topic.topicName); } @@ -104,8 +104,6 @@ PublisherImpl::~PublisherImpl() delete(this->mp_userPublisher); } - - bool PublisherImpl::create_new_change( ChangeKind_t changeKind, void* data) @@ -127,79 +125,79 @@ bool PublisherImpl::create_new_change_with_params( return false; } - if(changeKind == NOT_ALIVE_UNREGISTERED || changeKind == NOT_ALIVE_DISPOSED || + if (changeKind == NOT_ALIVE_UNREGISTERED || changeKind == NOT_ALIVE_DISPOSED || changeKind == NOT_ALIVE_DISPOSED_UNREGISTERED) { - if(m_att.topic.topicKind == NO_KEY) + if (m_att.topic.topicKind == NO_KEY) { - logError(PUBLISHER,"Topic is NO_KEY, operation not permitted"); + logError(PUBLISHER, "Topic is NO_KEY, operation not permitted"); return false; } } InstanceHandle_t handle; - if(m_att.topic.topicKind == WITH_KEY) + if (m_att.topic.topicKind == WITH_KEY) { bool is_key_protected = false; #if HAVE_SECURITY is_key_protected = mp_writer->getAttributes().security_attributes().is_key_protected; #endif - mp_type->getKey(data,&handle,is_key_protected); + mp_type->getKey(data, &handle, is_key_protected); } // Block lowlevel writer auto max_blocking_time = std::chrono::steady_clock::now() + - std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); + std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); #if HAVE_STRICT_REALTIME std::unique_lock lock(mp_writer->getMutex(), std::defer_lock); - if(lock.try_lock_until(max_blocking_time)) + if (lock.try_lock_until(max_blocking_time)) #else std::unique_lock lock(mp_writer->getMutex()); #endif { CacheChange_t* ch = mp_writer->new_change(mp_type->getSerializedSizeProvider(data), changeKind, handle); - if(ch != nullptr) + if (ch != nullptr) { - if(changeKind == ALIVE) + if (changeKind == ALIVE) { //If these two checks are correct, we asume the cachechange is valid and thwn we can write to it. - if(!mp_type->serialize(data, &ch->serializedPayload)) + if (!mp_type->serialize(data, &ch->serializedPayload)) { - logWarning(RTPS_WRITER,"RTPSWriter:Serialization returns false";); + logWarning(RTPS_WRITER, "RTPSWriter:Serialization returns false"; ); m_history.release_Cache(ch); return false; } } //TODO(Ricardo) This logic in a class. Then a user of rtps layer can use it. - if(high_mark_for_frag_ == 0) + if (high_mark_for_frag_ == 0) { uint32_t max_data_size = mp_writer->getMaxDataSize(); uint32_t writer_throughput_controller_bytes = - mp_writer->calculateMaxDataSize(m_att.throughputController.bytesPerPeriod); + mp_writer->calculateMaxDataSize(m_att.throughputController.bytesPerPeriod); uint32_t participant_throughput_controller_bytes = - mp_writer->calculateMaxDataSize( - mp_rtpsParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod); + mp_writer->calculateMaxDataSize( + mp_rtpsParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod); high_mark_for_frag_ = - max_data_size > writer_throughput_controller_bytes ? - writer_throughput_controller_bytes : - (max_data_size > participant_throughput_controller_bytes ? - participant_throughput_controller_bytes : - max_data_size); + max_data_size > writer_throughput_controller_bytes ? + writer_throughput_controller_bytes : + (max_data_size > participant_throughput_controller_bytes ? + participant_throughput_controller_bytes : + max_data_size); } uint32_t final_high_mark_for_frag = high_mark_for_frag_; // If needed inlineqos for related_sample_identity, then remove the inlinqos size from final fragment size. - if(wparams.related_sample_identity() != SampleIdentity::unknown()) + if (wparams.related_sample_identity() != SampleIdentity::unknown()) { final_high_mark_for_frag -= 32; } // If it is big data, fragment it. - if(ch->serializedPayload.length > final_high_mark_for_frag) + if (ch->serializedPayload.length > final_high_mark_for_frag) { /// Fragment the data. // Set the fragment size to the cachechange. @@ -208,7 +206,7 @@ bool PublisherImpl::create_new_change_with_params( } InstanceHandle_t change_handle = ch->instanceHandle; - if(!this->m_history.add_pub_change(ch, wparams, lock, max_blocking_time)) + if (!this->m_history.add_pub_change(ch, wparams, lock, max_blocking_time)) { m_history.release_Cache(ch); return false; @@ -237,7 +235,8 @@ bool PublisherImpl::create_new_change_with_params( if (m_att.qos.m_lifespan.duration != c_TimeInfinite) { - lifespan_duration_us_ = std::chrono::duration>(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); + lifespan_duration_us_ = std::chrono::duration >( + m_att.qos.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6); lifespan_timer_->restart_timer(); } @@ -249,13 +248,13 @@ bool PublisherImpl::create_new_change_with_params( return false; } - bool PublisherImpl::removeMinSeqChange() { return m_history.removeMinChange(); } -bool PublisherImpl::removeAllChange(size_t* removed) +bool PublisherImpl::removeAllChange( + size_t* removed) { return m_history.removeAllChange(removed); } @@ -264,84 +263,86 @@ const GUID_t& PublisherImpl::getGuid() { return mp_writer->getGuid(); } + // -bool PublisherImpl::updateAttributes(const PublisherAttributes& att) +bool PublisherImpl::updateAttributes( + const PublisherAttributes& att) { bool updated = true; bool missing = false; - if(this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) + if (this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) { - if(att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() || + if (att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() || att.multicastLocatorList.size() != this->m_att.multicastLocatorList.size()) { - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); updated &= false; } else { - for(LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin(); - lit1!=this->m_att.unicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin(); + lit1 != this->m_att.unicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.unicastLocatorList.begin(); - lit2!= att.unicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.unicastLocatorList.begin(); + lit2 != att.unicastLocatorList.end(); ++lit2) { - if(*lit1 == *lit2) + if (*lit1 == *lit2) { missing = false; break; } } - if(missing) + if (missing) { - logWarning(PUBLISHER,"Locator: "<< *lit1 << " not present in new list"); - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); } } - for(LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin(); - lit1!=this->m_att.multicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin(); + lit1 != this->m_att.multicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.multicastLocatorList.begin(); - lit2!= att.multicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.multicastLocatorList.begin(); + lit2 != att.multicastLocatorList.end(); ++lit2) { - if(*lit1 == *lit2) + if (*lit1 == *lit2) { missing = false; break; } } - if(missing) + if (missing) { - logWarning(PUBLISHER,"Locator: "<< *lit1<< " not present in new list"); - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); } } } } //TOPIC ATTRIBUTES - if(this->m_att.topic != att.topic) + if (this->m_att.topic != att.topic) { - logWarning(PUBLISHER,"Topic Attributes cannot be updated"); + logWarning(PUBLISHER, "Topic Attributes cannot be updated"); updated &= false; } //QOS: //CHECK IF THE QOS CAN BE SET - if(!this->m_att.qos.canQosBeUpdated(att.qos)) + if (!this->m_att.qos.canQosBeUpdated(att.qos)) { - updated &=false; + updated &= false; } - if(updated) + if (updated) { - if(this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) + if (this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) { //UPDATE TIMES: StatefulWriter* sfw = (StatefulWriter*)mp_writer; sfw->updateTimes(att.times); } - this->m_att.qos.setQos(att.qos,false); + this->m_att.qos.setQos(att.qos, false); this->m_att = att; //Notify the participant that a Writer has changed its QOS mp_rtpsParticipant->updateWriter(this->mp_writer, m_att.topic, m_att.qos); @@ -351,7 +352,7 @@ bool PublisherImpl::updateAttributes(const PublisherAttributes& att) if (m_att.qos.m_deadline.period != c_TimeInfinite) { deadline_duration_us_ = - duration>(m_att.qos.m_deadline.period.to_ns() * 1e-3); + duration >(m_att.qos.m_deadline.period.to_ns() * 1e-3); deadline_timer_->update_interval_millisec(m_att.qos.m_deadline.period.to_ns() * 1e-6); } else @@ -364,7 +365,7 @@ bool PublisherImpl::updateAttributes(const PublisherAttributes& att) if (m_att.qos.m_lifespan.duration != c_TimeInfinite) { lifespan_duration_us_ = - duration>(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); + duration >(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6); } else @@ -380,7 +381,7 @@ void PublisherImpl::PublisherWriterListener::onWriterMatched( RTPSWriter* /*writer*/, MatchingInfo& info) { - if( mp_publisherImpl->mp_listener != nullptr ) + if ( mp_publisherImpl->mp_listener != nullptr ) { mp_publisherImpl->mp_listener->onPublicationMatched(mp_publisherImpl->mp_userPublisher, info); } @@ -397,20 +398,21 @@ void PublisherImpl::PublisherWriterListener::onWriterChangeReceivedByAll( } void PublisherImpl::PublisherWriterListener::on_liveliness_lost( - RTPSWriter *writer, - const LivelinessLostStatus &status) + RTPSWriter* writer, + const LivelinessLostStatus& status) { (void)writer; if (mp_publisherImpl->mp_listener != nullptr) { mp_publisherImpl->mp_listener->on_liveliness_lost( - mp_publisherImpl->mp_userPublisher, - status); + mp_publisherImpl->mp_userPublisher, + status); } } -bool PublisherImpl::wait_for_all_acked(const eprosima::fastrtps::Time_t& max_wait) +bool PublisherImpl::wait_for_all_acked( + const eprosima::fastrtps::Time_t& max_wait) { return mp_writer->wait_for_all_acked(max_wait); } @@ -457,7 +459,8 @@ bool PublisherImpl::deadline_missed() return deadline_timer_reschedule(); } -void PublisherImpl::get_offered_deadline_missed_status(OfferedDeadlineMissedStatus &status) +void PublisherImpl::get_offered_deadline_missed_status( + OfferedDeadlineMissedStatus& status) { std::unique_lock lock(mp_writer->getMutex()); @@ -506,7 +509,8 @@ bool PublisherImpl::lifespan_expired() return true; } -void PublisherImpl::get_liveliness_lost_status(LivelinessLostStatus &status) +void PublisherImpl::get_liveliness_lost_status( + LivelinessLostStatus& status) { std::unique_lock lock(mp_writer->getMutex()); diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 4d34a7dea65..1f5f1fc4bb1 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -178,18 +178,20 @@ void StatelessWriter::unsent_change_added_to_history( { RTPSMessageGroup group(mp_RTPSParticipant, this, it, max_blocking_time); - if(change->getFragmentCount() > 0) + if (change->getFragmentCount() > 0) { ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([this, &group, change](FragmentNumber_t fragment_number) - { - if (!group.add_data_frag(*change, fragment_number, - is_inline_qos_expected_)) - { - logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); - } - }); + change_for_reader.getUnsentFragments().for_each([this, &group, + change](FragmentNumber_t fragment_number) + { + if (!group.add_data_frag(*change, fragment_number, + is_inline_qos_expected_)) + { + logError(RTPS_WRITER, + "Error sending fragment (" << change->sequenceNumber << + ", " << fragment_number << ")"); + } + }); } else { @@ -211,22 +213,24 @@ void StatelessWriter::unsent_change_added_to_history( } } - if(there_are_remote_readers_ || !fixed_locators_.empty()) + if (there_are_remote_readers_ || !fixed_locators_.empty()) { RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); - if(change->getFragmentCount() > 0) + if (change->getFragmentCount() > 0) { ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([this, &group, change](FragmentNumber_t fragment_number) - { - if (!group.add_data_frag(*change, fragment_number, - is_inline_qos_expected_)) - { - logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); - } - }); + change_for_reader.getUnsentFragments().for_each([this, &group, + change](FragmentNumber_t fragment_number) + { + if (!group.add_data_frag(*change, fragment_number, + is_inline_qos_expected_)) + { + logError(RTPS_WRITER, + "Error sending fragment (" << change->sequenceNumber << + ", " << fragment_number << ")"); + } + }); } else { @@ -414,10 +418,10 @@ void StatelessWriter::send_any_unsent_changes() if (changeToSend.fragmentNumber != 0) { if (!group.add_data_frag(*changeToSend.cacheChange, changeToSend.fragmentNumber, - is_inline_qos_expected_)) + is_inline_qos_expected_)) { logError(RTPS_WRITER, "Error sending fragment (" << changeToSend.sequenceNumber << - ", " << changeToSend.fragmentNumber << ")"); + ", " << changeToSend.fragmentNumber << ")"); } } else From 8f0c264d406ad48ab251e6ed047190c293d7fc9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Thu, 23 Jan 2020 11:49:16 +0100 Subject: [PATCH 2/9] Refs #7399. Fixed error sending fragments. Fixed async thread is not awaken when sending more than 256 fragments. --- include/fastrtps/rtps/writer/StatelessWriter.h | 2 +- src/cpp/publisher/PublisherImpl.cpp | 1 + src/cpp/rtps/writer/StatelessWriter.cpp | 14 ++++++++++++-- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/include/fastrtps/rtps/writer/StatelessWriter.h b/include/fastrtps/rtps/writer/StatelessWriter.h index ad2ad05747d..e01ba699241 100644 --- a/include/fastrtps/rtps/writer/StatelessWriter.h +++ b/include/fastrtps/rtps/writer/StatelessWriter.h @@ -114,7 +114,7 @@ class StatelessWriter : public RTPSWriter bool set_fixed_locators( const LocatorList_t& locator_list); - void update_unsent_changes( + bool update_unsent_changes( const SequenceNumber_t& seq_num, const FragmentNumber_t& frag_num); diff --git a/src/cpp/publisher/PublisherImpl.cpp b/src/cpp/publisher/PublisherImpl.cpp index ccb0d1cae58..255d6a8f22e 100644 --- a/src/cpp/publisher/PublisherImpl.cpp +++ b/src/cpp/publisher/PublisherImpl.cpp @@ -186,6 +186,7 @@ bool PublisherImpl::create_new_change_with_params( (max_data_size > participant_throughput_controller_bytes ? participant_throughput_controller_bytes : max_data_size); + high_mark_for_frag_ &= ~3; } uint32_t final_high_mark_for_frag = high_mark_for_frag_; diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 1f5f1fc4bb1..19fb48c9626 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -331,10 +331,11 @@ bool StatelessWriter::is_acked_by_all( return true; } -void StatelessWriter::update_unsent_changes( +bool StatelessWriter::update_unsent_changes( const SequenceNumber_t& seq_num, const FragmentNumber_t& frag_num) { + bool has_to_wake_async_thread = false; auto find_by_seq_num = [seq_num](const ChangeForReader_t& unsent_change) { return seq_num == unsent_change.getSequenceNumber(); @@ -355,7 +356,13 @@ void StatelessWriter::update_unsent_changes( { unsent_changes_.remove_if(find_by_seq_num); } + else + { + has_to_wake_async_thread = true; + } } + + return has_to_wake_async_thread; } void StatelessWriter::send_any_unsent_changes() @@ -410,7 +417,10 @@ void StatelessWriter::send_any_unsent_changes() // Remove the messages selected for sending from the original list, // and update those that were fragmented with the new sent index - update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber); + if (update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber)) + { + mp_RTPSParticipant->async_thread().wake_up(this); + } // Notify the controllers FlowController::NotifyControllersChangeSent(changeToSend.cacheChange); From 2d7c9d6c9a74a8273acb39d8daff21b2c071f827 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 Jan 2020 10:36:22 +0100 Subject: [PATCH 3/9] Refs #7399. Added blackbox tests. --- .../blackbox/BlackboxTestsPubSubFragments.cpp | 144 ++++++++++++------ 1 file changed, 96 insertions(+), 48 deletions(-) diff --git a/test/blackbox/BlackboxTestsPubSubFragments.cpp b/test/blackbox/BlackboxTestsPubSubFragments.cpp index 0481de2ea2a..4ab7f35d15c 100644 --- a/test/blackbox/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/BlackboxTestsPubSubFragments.cpp @@ -49,75 +49,123 @@ class PubSubFragments : public testing::TestWithParam } } -}; +protected: + + void do_fragment_test( + const std::string& topic_name, + std::list& data, + bool asynchronous, + bool reliable, + bool small_fragments) + { + PubSubReader reader(topic_name); + PubSubWriter writer(topic_name); -TEST_P(PubSubFragments, PubSubAsNonReliableData300kb) -{ - // Mutes an expected error - Log::SetErrorStringFilter(std::regex("^((?!Big data).)*$")); + reader + .socket_buffer_size(1048576) // accomodate large and fast fragments + .history_depth(data.size()) + .reliability(reliable ? + eprosima::fastrtps::RELIABLE_RELIABILITY_QOS : + eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .init(); - PubSubReader reader(TEST_TOPIC_NAME); - PubSubWriter writer(TEST_TOPIC_NAME); + ASSERT_TRUE(reader.isInitialized()); - //Increasing reception buffer to accomodate large and fast fragments - reader.socket_buffer_size(1048576).init(); + if (small_fragments) + { + auto testTransport = std::make_shared(); + testTransport->sendBufferSize = 1024; + testTransport->maxMessageSize = 1024; + testTransport->receiveBufferSize = 65536; + writer.disable_builtin_transport(); + writer.add_user_transport_to_pparams(testTransport); + } - ASSERT_TRUE(reader.isInitialized()); + if (asynchronous) + { + writer.asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE); + } - writer.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS).init(); + writer + .history_depth(data.size()) + .reliability(reliable ? + eprosima::fastrtps::RELIABLE_RELIABILITY_QOS : + eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .init(); - ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(writer.isInitialized()); - // Because its volatile the durability - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + reader.startReception(data); + // Send data + writer.send(data); + ASSERT_TRUE(data.empty()); + + // Block reader until reception finished or timeout. + if (reliable) + { + reader.block_for_all(); + } + else + { + reader.block_for_at_least(2); + } + } +}; + +TEST_P(PubSubFragments, PubSubAsNonReliableData300kb) +{ auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, false, false); +} - reader.startReception(data); - // Send data - writer.send(data); - ASSERT_TRUE(data.empty()); - // Block reader until reception finished or timeout. - reader.block_for_at_least(2); +TEST_P(PubSubFragments, PubSubAsNonReliableData300kbSmallFragments) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, false, true); } TEST_P(PubSubFragments, PubSubAsReliableData300kb) { - // Mutes an expected error - Log::SetErrorStringFilter(std::regex("^((?!Big data).)*$")); - - PubSubReader reader(TEST_TOPIC_NAME); - PubSubWriter writer(TEST_TOPIC_NAME); - - reader.history_depth(10). - reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).init(); - - ASSERT_TRUE(reader.isInitialized()); - - writer.history_depth(10). - reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).init(); + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, true, false); +} - ASSERT_TRUE(writer.isInitialized()); +TEST_P(PubSubFragments, PubSubAsReliableData300kbSmallFragments) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, true, true); +} - // Because its volatile the durability - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); +TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, false, false); +} +TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kbSmallFragments) +{ auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, false, true); +} - reader.startReception(data); +TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kb) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, true, false); +} - // Send data - writer.send(data); - ASSERT_TRUE(data.empty()); - // Block reader until reception finished or timeout. - reader.block_for_all(); +TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kbSmallFragments) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, true, true); } -TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) +TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kbWithFlowControl) { PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter writer(TEST_TOPIC_NAME); @@ -154,7 +202,7 @@ TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) reader.block_for_at_least(2); } -TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kb) +TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kbWithFlowControl) { PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter writer(TEST_TOPIC_NAME); From 07991d2ae754b04ed43f7e1dfb647f85d28fdb23 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 Jan 2020 09:29:26 +0100 Subject: [PATCH 4/9] Refs #7399. Change approach of fix. --- .../fastrtps/rtps/writer/StatelessWriter.h | 2 +- src/cpp/rtps/writer/StatelessWriter.cpp | 125 +++++++++--------- 2 files changed, 65 insertions(+), 62 deletions(-) diff --git a/include/fastrtps/rtps/writer/StatelessWriter.h b/include/fastrtps/rtps/writer/StatelessWriter.h index e01ba699241..ad2ad05747d 100644 --- a/include/fastrtps/rtps/writer/StatelessWriter.h +++ b/include/fastrtps/rtps/writer/StatelessWriter.h @@ -114,7 +114,7 @@ class StatelessWriter : public RTPSWriter bool set_fixed_locators( const LocatorList_t& locator_list); - bool update_unsent_changes( + void update_unsent_changes( const SequenceNumber_t& seq_num, const FragmentNumber_t& frag_num); diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 19fb48c9626..47f7a377221 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -331,11 +331,10 @@ bool StatelessWriter::is_acked_by_all( return true; } -bool StatelessWriter::update_unsent_changes( +void StatelessWriter::update_unsent_changes( const SequenceNumber_t& seq_num, const FragmentNumber_t& frag_num) { - bool has_to_wake_async_thread = false; auto find_by_seq_num = [seq_num](const ChangeForReader_t& unsent_change) { return seq_num == unsent_change.getSequenceNumber(); @@ -356,13 +355,7 @@ bool StatelessWriter::update_unsent_changes( { unsent_changes_.remove_if(find_by_seq_num); } - else - { - has_to_wake_async_thread = true; - } } - - return has_to_wake_async_thread; } void StatelessWriter::send_any_unsent_changes() @@ -370,87 +363,97 @@ void StatelessWriter::send_any_unsent_changes() //TODO(Mcc) Separate sending for asynchronous writers std::lock_guard guard(mp_mutex); - RTPSWriterCollector changesToSend; - - for (const ChangeForReader_t& unsentChange : unsent_changes_) + bool flow_controllers_limited = false; + while (!unsent_changes_.empty() && !flow_controllers_limited) { - CacheChange_t* cache_change = unsentChange.getChange(); - changesToSend.add_change(cache_change, nullptr, unsentChange.getUnsentFragments()); + RTPSWriterCollector changesToSend; - uint64_t sequence_number = cache_change->sequenceNumber.to64long(); - // Filter intraprocess unsent changes - if (sequence_number > last_intraprocess_sequence_number_) + for (const ChangeForReader_t& unsentChange : unsent_changes_) { - last_intraprocess_sequence_number_ = sequence_number; - for (ReaderLocator& it : matched_readers_) + CacheChange_t* cache_change = unsentChange.getChange(); + changesToSend.add_change(cache_change, nullptr, unsentChange.getUnsentFragments()); + + uint64_t sequence_number = cache_change->sequenceNumber.to64long(); + // Filter intraprocess unsent changes + if (sequence_number > last_intraprocess_sequence_number_) { - if (it.is_local_reader()) + last_intraprocess_sequence_number_ = sequence_number; + for (ReaderLocator& it : matched_readers_) { - intraprocess_delivery(cache_change, it); + if (it.is_local_reader()) + { + intraprocess_delivery(cache_change, it); + } } } } - } - if (there_are_remote_readers_ || !fixed_locators_.empty()) - { - // Clear through local controllers - for (auto& controller : flow_controllers_) + if (there_are_remote_readers_ || !fixed_locators_.empty()) { - (*controller)(changesToSend); - } + // Read number of changes to send to detect if flow controllers limited them + size_t n_items = changesToSend.size(); - // Clear through parent controllers - for (auto& controller : mp_RTPSParticipant->getFlowControllers()) - { - (*controller)(changesToSend); - } + // Clear through local controllers + for (auto& controller : flow_controllers_) + { + (*controller)(changesToSend); + } - try - { - RTPSMessageGroup group(mp_RTPSParticipant, this, *this); + // Clear through parent controllers + for (auto& controller : mp_RTPSParticipant->getFlowControllers()) + { + (*controller)(changesToSend); + } + + flow_controllers_limited = n_items != changesToSend.size(); - bool bHasListener = mp_listener != nullptr; - while (!changesToSend.empty()) + try { - RTPSWriterCollector::Item changeToSend = changesToSend.pop(); + RTPSMessageGroup group(mp_RTPSParticipant, this, *this); - // Remove the messages selected for sending from the original list, - // and update those that were fragmented with the new sent index - if (update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber)) + bool bHasListener = mp_listener != nullptr; + while (!changesToSend.empty()) { - mp_RTPSParticipant->async_thread().wake_up(this); - } + RTPSWriterCollector::Item changeToSend = changesToSend.pop(); - // Notify the controllers - FlowController::NotifyControllersChangeSent(changeToSend.cacheChange); + // Remove the messages selected for sending from the original list, + // and update those that were fragmented with the new sent index + update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber); - if (changeToSend.fragmentNumber != 0) - { - if (!group.add_data_frag(*changeToSend.cacheChange, changeToSend.fragmentNumber, - is_inline_qos_expected_)) + // Notify the controllers + FlowController::NotifyControllersChangeSent(changeToSend.cacheChange); + + if (changeToSend.fragmentNumber != 0) { - logError(RTPS_WRITER, "Error sending fragment (" << changeToSend.sequenceNumber << + if (!group.add_data_frag(*changeToSend.cacheChange, changeToSend.fragmentNumber, + is_inline_qos_expected_)) + { + logError(RTPS_WRITER, "Error sending fragment (" << changeToSend.sequenceNumber << ", " << changeToSend.fragmentNumber << ")"); + } } - } - else - { - if (!group.add_data(*changeToSend.cacheChange, is_inline_qos_expected_)) + else { - logError(RTPS_WRITER, "Error sending change " << changeToSend.sequenceNumber); + if (!group.add_data(*changeToSend.cacheChange, is_inline_qos_expected_)) + { + logError(RTPS_WRITER, "Error sending change " << changeToSend.sequenceNumber); + } } - } - if (bHasListener && is_acked_by_all(changeToSend.cacheChange)) - { - mp_listener->onWriterChangeReceivedByAll(this, changeToSend.cacheChange); + if (bHasListener && is_acked_by_all(changeToSend.cacheChange)) + { + mp_listener->onWriterChangeReceivedByAll(this, changeToSend.cacheChange); + } } } + catch (const RTPSMessageGroup::timeout&) + { + logError(RTPS_WRITER, "Max blocking time reached"); + } } - catch (const RTPSMessageGroup::timeout&) + else { - logError(RTPS_WRITER, "Max blocking time reached"); + unsent_changes_.clear(); } } From 3d848a31c39e6b03a912853997e4a6fc0460825a Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 Jan 2020 09:37:14 +0100 Subject: [PATCH 5/9] Refs #7399. Fix for synchronous StatelessWriter. --- src/cpp/rtps/writer/StatelessWriter.cpp | 46 +++++++++++-------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 47f7a377221..5bdcf4846b7 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -178,20 +178,17 @@ void StatelessWriter::unsent_change_added_to_history( { RTPSMessageGroup group(mp_RTPSParticipant, this, it, max_blocking_time); - if (change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([this, &group, - change](FragmentNumber_t fragment_number) - { - if (!group.add_data_frag(*change, fragment_number, - is_inline_qos_expected_)) - { - logError(RTPS_WRITER, - "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); - } - }); + for (uint32_t frag = 1; frag <= n_fragments; frag++) + { + if (!group.add_data_frag(*change, frag, is_inline_qos_expected_)) + { + logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber + << ", " << frag << ")"); + } + } } else { @@ -217,20 +214,17 @@ void StatelessWriter::unsent_change_added_to_history( { RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); - if (change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([this, &group, - change](FragmentNumber_t fragment_number) - { - if (!group.add_data_frag(*change, fragment_number, - is_inline_qos_expected_)) - { - logError(RTPS_WRITER, - "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); - } - }); + for (uint32_t frag = 1; frag <= n_fragments; frag++) + { + if (!group.add_data_frag(*change, frag, is_inline_qos_expected_)) + { + logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber + << ", " << frag << ")"); + } + } } else { From 2c15508d375ed27f853b8be8d125bb16ae7d91e2 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 Jan 2020 10:47:22 +0100 Subject: [PATCH 6/9] Refs #7399. Fix for synchronous StatefulWriter. --- src/cpp/rtps/writer/StatefulWriter.cpp | 32 ++++++++++++-------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 72c1e68827a..c9db4f553c1 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -225,13 +225,12 @@ void StatefulWriter::unsent_change_added_to_history( { RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); - if (change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([&](FragmentNumber_t fragment_number) + for (uint32_t frag = 1; frag <= n_fragments; frag++) { - if (group.add_data_frag(*change, fragment_number, - expectsInlineQos)) + if (group.add_data_frag(*change, frag, expectsInlineQos)) { for (ReaderProxy* it : matched_readers_) { @@ -239,18 +238,18 @@ void StatefulWriter::unsent_change_added_to_history( { bool allFragmentsSent = false; it->mark_fragment_as_sent_for_change( - change->sequenceNumber, - fragment_number, - allFragmentsSent); + change->sequenceNumber, + frag, + allFragmentsSent); } } } else { logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); + ", " << frag << ")"); } - }); + } } else { @@ -293,18 +292,17 @@ void StatefulWriter::unsent_change_added_to_history( RTPSMessageGroup group(mp_RTPSParticipant, this, it->message_sender(), max_blocking_time); - if (change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([&](FragmentNumber_t fragment_number) + for (uint32_t frag = 1; frag <= n_fragments; frag++) { - if (!group.add_data_frag(*change, fragment_number, - it->expects_inline_qos())) + if (!group.add_data_frag(*change, frag, it->expects_inline_qos())) { logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); + ", " << frag << ")"); } - }); + } } else { From 3623adaff4fb3371a867497b8b6113509f9d1b85 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 Jan 2020 11:36:55 +0100 Subject: [PATCH 7/9] Refs #7399. Distinguish reserved_size and max_size on CDRMessage_t --- include/fastrtps/rtps/common/CDRMessage_t.h | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/include/fastrtps/rtps/common/CDRMessage_t.h b/include/fastrtps/rtps/common/CDRMessage_t.h index c73e7531978..6bbe6140051 100644 --- a/include/fastrtps/rtps/common/CDRMessage_t.h +++ b/include/fastrtps/rtps/common/CDRMessage_t.h @@ -54,6 +54,7 @@ struct RTPS_DllAPI CDRMessage_t{ length = 0; buffer = (octet*) malloc(RTPSMESSAGE_DEFAULT_SIZE); max_size = RTPSMESSAGE_DEFAULT_SIZE; + reserved_size = RTPSMESSAGE_DEFAULT_SIZE; #if __BIG_ENDIAN__ msg_endian = BIGEND; @@ -84,6 +85,7 @@ struct RTPS_DllAPI CDRMessage_t{ buffer = nullptr; max_size = size; + reserved_size = size; #if __BIG_ENDIAN__ msg_endian = BIGEND; @@ -103,6 +105,7 @@ struct RTPS_DllAPI CDRMessage_t{ length = payload.length; buffer = payload.data; max_size = payload.max_size; + reserved_size = payload.max_size; } CDRMessage_t(const CDRMessage_t& message) @@ -113,6 +116,7 @@ struct RTPS_DllAPI CDRMessage_t{ max_size = message.max_size; msg_endian = message.msg_endian; + reserved_size = max_size; if(max_size != 0) { buffer = (octet*)malloc(max_size); @@ -132,6 +136,8 @@ struct RTPS_DllAPI CDRMessage_t{ message.length = 0; max_size = message.max_size; message.max_size = 0; + reserved_size = message.reserved_size; + message.reserved_size = 0; msg_endian = message.msg_endian; #if __BIG_ENDIAN__ message.msg_endian = BIGEND; @@ -152,6 +158,8 @@ struct RTPS_DllAPI CDRMessage_t{ message.length = 0; max_size = message.max_size; message.max_size = 0; + reserved_size = message.reserved_size; + message.reserved_size = 0; msg_endian = message.msg_endian; #if __BIG_ENDIAN__ message.msg_endian = BIGEND; @@ -168,7 +176,7 @@ struct RTPS_DllAPI CDRMessage_t{ uint32_t size) { assert(wraps == false); - if (size > max_size) + if (size > reserved_size) { octet* new_buffer = (octet*) realloc(buffer, size); if (new_buffer == nullptr) @@ -178,9 +186,11 @@ struct RTPS_DllAPI CDRMessage_t{ else { buffer = new_buffer; - max_size = size; + reserved_size = size; } } + + max_size = size; } //!Pointer to the buffer where the data is stored. @@ -189,6 +199,8 @@ struct RTPS_DllAPI CDRMessage_t{ uint32_t pos; //!Max size of the message. uint32_t max_size; + //!Max size of the message. + uint32_t reserved_size; //!Current length of the message. uint32_t length; //!Endianness of the message. From 41d26681554d32ae461a46087977278c9eaa1a95 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 Jan 2020 11:47:24 +0100 Subject: [PATCH 8/9] Refs #7399. Fixing style and warnings. --- src/cpp/publisher/PublisherImpl.cpp | 32 ++++++++----------- .../blackbox/BlackboxTestsPubSubFragments.cpp | 4 +-- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/src/cpp/publisher/PublisherImpl.cpp b/src/cpp/publisher/PublisherImpl.cpp index 255d6a8f22e..c9f300ac5e9 100644 --- a/src/cpp/publisher/PublisherImpl.cpp +++ b/src/cpp/publisher/PublisherImpl.cpp @@ -43,10 +43,6 @@ using namespace eprosima::fastrtps; using namespace ::rtps; using namespace std::chrono; -using namespace std::chrono; - -using namespace std::chrono; - PublisherImpl::PublisherImpl( ParticipantImpl* p, TopicDataType* pdatatype, @@ -76,18 +72,18 @@ PublisherImpl::PublisherImpl( , lifespan_duration_us_(m_att.qos.m_lifespan.duration.to_ns() * 1e-3) { deadline_timer_ = new TimedEvent(mp_participant->get_resource_event(), - [&]() -> bool - { - return deadline_missed(); - }, - att.qos.m_deadline.period.to_ns() * 1e-6); + [&]() -> bool + { + return deadline_missed(); + }, + att.qos.m_deadline.period.to_ns() * 1e-6); lifespan_timer_ = new TimedEvent(mp_participant->get_resource_event(), - [&]() -> bool - { - return lifespan_expired(); - }, - m_att.qos.m_lifespan.duration.to_ns() * 1e-6); + [&]() -> bool + { + return lifespan_expired(); + }, + m_att.qos.m_lifespan.duration.to_ns() * 1e-6); } PublisherImpl::~PublisherImpl() @@ -146,8 +142,8 @@ bool PublisherImpl::create_new_change_with_params( } // Block lowlevel writer - auto max_blocking_time = std::chrono::steady_clock::now() + - std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); + auto max_blocking_time = steady_clock::now() + + microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); #if HAVE_STRICT_REALTIME std::unique_lock lock(mp_writer->getMutex(), std::defer_lock); @@ -178,7 +174,7 @@ bool PublisherImpl::create_new_change_with_params( mp_writer->calculateMaxDataSize(m_att.throughputController.bytesPerPeriod); uint32_t participant_throughput_controller_bytes = mp_writer->calculateMaxDataSize( - mp_rtpsParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod); + mp_rtpsParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod); high_mark_for_frag_ = max_data_size > writer_throughput_controller_bytes ? @@ -236,7 +232,7 @@ bool PublisherImpl::create_new_change_with_params( if (m_att.qos.m_lifespan.duration != c_TimeInfinite) { - lifespan_duration_us_ = std::chrono::duration >( + lifespan_duration_us_ = duration >( m_att.qos.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6); lifespan_timer_->restart_timer(); diff --git a/test/blackbox/BlackboxTestsPubSubFragments.cpp b/test/blackbox/BlackboxTestsPubSubFragments.cpp index 4ab7f35d15c..18f457c3618 100644 --- a/test/blackbox/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/BlackboxTestsPubSubFragments.cpp @@ -63,7 +63,7 @@ class PubSubFragments : public testing::TestWithParam reader .socket_buffer_size(1048576) // accomodate large and fast fragments - .history_depth(data.size()) + .history_depth(static_cast(data.size())) .reliability(reliable ? eprosima::fastrtps::RELIABLE_RELIABILITY_QOS : eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) @@ -87,7 +87,7 @@ class PubSubFragments : public testing::TestWithParam } writer - .history_depth(data.size()) + .history_depth(static_cast(data.size())) .reliability(reliable ? eprosima::fastrtps::RELIABLE_RELIABILITY_QOS : eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) From abe15fcc4f6c9ba473dff4b9a02210ec14c874c0 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 24 Jan 2020 12:42:51 +0100 Subject: [PATCH 9/9] Refs #7399. Improve best-effort tests. --- test/blackbox/BlackboxTestsPubSubFragments.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/blackbox/BlackboxTestsPubSubFragments.cpp b/test/blackbox/BlackboxTestsPubSubFragments.cpp index 18f457c3618..b52bc981464 100644 --- a/test/blackbox/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/BlackboxTestsPubSubFragments.cpp @@ -102,7 +102,7 @@ class PubSubFragments : public testing::TestWithParam reader.startReception(data); // Send data - writer.send(data); + writer.send(data, reliable ? 0u : 10u); ASSERT_TRUE(data.empty()); // Block reader until reception finished or timeout.