diff --git a/include/fastrtps/rtps/common/CDRMessage_t.h b/include/fastrtps/rtps/common/CDRMessage_t.h index c73e7531978..6bbe6140051 100644 --- a/include/fastrtps/rtps/common/CDRMessage_t.h +++ b/include/fastrtps/rtps/common/CDRMessage_t.h @@ -54,6 +54,7 @@ struct RTPS_DllAPI CDRMessage_t{ length = 0; buffer = (octet*) malloc(RTPSMESSAGE_DEFAULT_SIZE); max_size = RTPSMESSAGE_DEFAULT_SIZE; + reserved_size = RTPSMESSAGE_DEFAULT_SIZE; #if __BIG_ENDIAN__ msg_endian = BIGEND; @@ -84,6 +85,7 @@ struct RTPS_DllAPI CDRMessage_t{ buffer = nullptr; max_size = size; + reserved_size = size; #if __BIG_ENDIAN__ msg_endian = BIGEND; @@ -103,6 +105,7 @@ struct RTPS_DllAPI CDRMessage_t{ length = payload.length; buffer = payload.data; max_size = payload.max_size; + reserved_size = payload.max_size; } CDRMessage_t(const CDRMessage_t& message) @@ -113,6 +116,7 @@ struct RTPS_DllAPI CDRMessage_t{ max_size = message.max_size; msg_endian = message.msg_endian; + reserved_size = max_size; if(max_size != 0) { buffer = (octet*)malloc(max_size); @@ -132,6 +136,8 @@ struct RTPS_DllAPI CDRMessage_t{ message.length = 0; max_size = message.max_size; message.max_size = 0; + reserved_size = message.reserved_size; + message.reserved_size = 0; msg_endian = message.msg_endian; #if __BIG_ENDIAN__ message.msg_endian = BIGEND; @@ -152,6 +158,8 @@ struct RTPS_DllAPI CDRMessage_t{ message.length = 0; max_size = message.max_size; message.max_size = 0; + reserved_size = message.reserved_size; + message.reserved_size = 0; msg_endian = message.msg_endian; #if __BIG_ENDIAN__ message.msg_endian = BIGEND; @@ -168,7 +176,7 @@ struct RTPS_DllAPI CDRMessage_t{ uint32_t size) { assert(wraps == false); - if (size > max_size) + if (size > reserved_size) { octet* new_buffer = (octet*) realloc(buffer, size); if (new_buffer == nullptr) @@ -178,9 +186,11 @@ struct RTPS_DllAPI CDRMessage_t{ else { buffer = new_buffer; - max_size = size; + reserved_size = size; } } + + max_size = size; } //!Pointer to the buffer where the data is stored. @@ -189,6 +199,8 @@ struct RTPS_DllAPI CDRMessage_t{ uint32_t pos; //!Max size of the message. uint32_t max_size; + //!Max size of the message. + uint32_t reserved_size; //!Current length of the message. uint32_t length; //!Endianness of the message. diff --git a/src/cpp/publisher/PublisherImpl.cpp b/src/cpp/publisher/PublisherImpl.cpp index 2c0baacd423..c9f300ac5e9 100644 --- a/src/cpp/publisher/PublisherImpl.cpp +++ b/src/cpp/publisher/PublisherImpl.cpp @@ -43,10 +43,6 @@ using namespace eprosima::fastrtps; using namespace ::rtps; using namespace std::chrono; -using namespace std::chrono; - -using namespace std::chrono; - PublisherImpl::PublisherImpl( ParticipantImpl* p, TopicDataType* pdatatype, @@ -60,7 +56,7 @@ PublisherImpl::PublisherImpl( , m_history(this, pdatatype->m_typeSize #if HAVE_SECURITY // In future v2 changepool is in writer, and writer set this value to cachechagepool. - + 20 /*SecureDataHeader*/ + 4 + ((2* 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/ + + 20 /*SecureDataHeader*/ + 4 + ((2 * 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/ + 16 + 4 /*SecureDataTag*/ #endif , att.topic.historyQos, att.topic.resourceLimitsQos, att.historyMemoryPolicy) @@ -95,7 +91,7 @@ PublisherImpl::~PublisherImpl() delete(lifespan_timer_); delete(deadline_timer_); - if(mp_writer != nullptr) + if (mp_writer != nullptr) { logInfo(PUBLISHER, this->getGuid().entityId << " in topic: " << this->m_att.topic.topicName); } @@ -104,8 +100,6 @@ PublisherImpl::~PublisherImpl() delete(this->mp_userPublisher); } - - bool PublisherImpl::create_new_change( ChangeKind_t changeKind, void* data) @@ -127,79 +121,80 @@ bool PublisherImpl::create_new_change_with_params( return false; } - if(changeKind == NOT_ALIVE_UNREGISTERED || changeKind == NOT_ALIVE_DISPOSED || + if (changeKind == NOT_ALIVE_UNREGISTERED || changeKind == NOT_ALIVE_DISPOSED || changeKind == NOT_ALIVE_DISPOSED_UNREGISTERED) { - if(m_att.topic.topicKind == NO_KEY) + if (m_att.topic.topicKind == NO_KEY) { - logError(PUBLISHER,"Topic is NO_KEY, operation not permitted"); + logError(PUBLISHER, "Topic is NO_KEY, operation not permitted"); return false; } } InstanceHandle_t handle; - if(m_att.topic.topicKind == WITH_KEY) + if (m_att.topic.topicKind == WITH_KEY) { bool is_key_protected = false; #if HAVE_SECURITY is_key_protected = mp_writer->getAttributes().security_attributes().is_key_protected; #endif - mp_type->getKey(data,&handle,is_key_protected); + mp_type->getKey(data, &handle, is_key_protected); } // Block lowlevel writer - auto max_blocking_time = std::chrono::steady_clock::now() + - std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); + auto max_blocking_time = steady_clock::now() + + microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); #if HAVE_STRICT_REALTIME std::unique_lock lock(mp_writer->getMutex(), std::defer_lock); - if(lock.try_lock_until(max_blocking_time)) + if (lock.try_lock_until(max_blocking_time)) #else std::unique_lock lock(mp_writer->getMutex()); #endif { CacheChange_t* ch = mp_writer->new_change(mp_type->getSerializedSizeProvider(data), changeKind, handle); - if(ch != nullptr) + if (ch != nullptr) { - if(changeKind == ALIVE) + if (changeKind == ALIVE) { //If these two checks are correct, we asume the cachechange is valid and thwn we can write to it. - if(!mp_type->serialize(data, &ch->serializedPayload)) + if (!mp_type->serialize(data, &ch->serializedPayload)) { - logWarning(RTPS_WRITER,"RTPSWriter:Serialization returns false";); + logWarning(RTPS_WRITER, "RTPSWriter:Serialization returns false"; ); m_history.release_Cache(ch); return false; } } //TODO(Ricardo) This logic in a class. Then a user of rtps layer can use it. - if(high_mark_for_frag_ == 0) + if (high_mark_for_frag_ == 0) { uint32_t max_data_size = mp_writer->getMaxDataSize(); uint32_t writer_throughput_controller_bytes = - mp_writer->calculateMaxDataSize(m_att.throughputController.bytesPerPeriod); + mp_writer->calculateMaxDataSize(m_att.throughputController.bytesPerPeriod); uint32_t participant_throughput_controller_bytes = - mp_writer->calculateMaxDataSize( + mp_writer->calculateMaxDataSize( mp_rtpsParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod); high_mark_for_frag_ = - max_data_size > writer_throughput_controller_bytes ? - writer_throughput_controller_bytes : - (max_data_size > participant_throughput_controller_bytes ? - participant_throughput_controller_bytes : - max_data_size); + max_data_size > writer_throughput_controller_bytes ? + writer_throughput_controller_bytes : + (max_data_size > participant_throughput_controller_bytes ? + participant_throughput_controller_bytes : + max_data_size); + high_mark_for_frag_ &= ~3; } uint32_t final_high_mark_for_frag = high_mark_for_frag_; // If needed inlineqos for related_sample_identity, then remove the inlinqos size from final fragment size. - if(wparams.related_sample_identity() != SampleIdentity::unknown()) + if (wparams.related_sample_identity() != SampleIdentity::unknown()) { final_high_mark_for_frag -= 32; } // If it is big data, fragment it. - if(ch->serializedPayload.length > final_high_mark_for_frag) + if (ch->serializedPayload.length > final_high_mark_for_frag) { /// Fragment the data. // Set the fragment size to the cachechange. @@ -208,7 +203,7 @@ bool PublisherImpl::create_new_change_with_params( } InstanceHandle_t change_handle = ch->instanceHandle; - if(!this->m_history.add_pub_change(ch, wparams, lock, max_blocking_time)) + if (!this->m_history.add_pub_change(ch, wparams, lock, max_blocking_time)) { m_history.release_Cache(ch); return false; @@ -237,7 +232,8 @@ bool PublisherImpl::create_new_change_with_params( if (m_att.qos.m_lifespan.duration != c_TimeInfinite) { - lifespan_duration_us_ = std::chrono::duration>(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); + lifespan_duration_us_ = duration >( + m_att.qos.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6); lifespan_timer_->restart_timer(); } @@ -249,13 +245,13 @@ bool PublisherImpl::create_new_change_with_params( return false; } - bool PublisherImpl::removeMinSeqChange() { return m_history.removeMinChange(); } -bool PublisherImpl::removeAllChange(size_t* removed) +bool PublisherImpl::removeAllChange( + size_t* removed) { return m_history.removeAllChange(removed); } @@ -264,84 +260,86 @@ const GUID_t& PublisherImpl::getGuid() { return mp_writer->getGuid(); } + // -bool PublisherImpl::updateAttributes(const PublisherAttributes& att) +bool PublisherImpl::updateAttributes( + const PublisherAttributes& att) { bool updated = true; bool missing = false; - if(this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) + if (this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) { - if(att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() || + if (att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() || att.multicastLocatorList.size() != this->m_att.multicastLocatorList.size()) { - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); updated &= false; } else { - for(LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin(); - lit1!=this->m_att.unicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin(); + lit1 != this->m_att.unicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.unicastLocatorList.begin(); - lit2!= att.unicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.unicastLocatorList.begin(); + lit2 != att.unicastLocatorList.end(); ++lit2) { - if(*lit1 == *lit2) + if (*lit1 == *lit2) { missing = false; break; } } - if(missing) + if (missing) { - logWarning(PUBLISHER,"Locator: "<< *lit1 << " not present in new list"); - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); } } - for(LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin(); - lit1!=this->m_att.multicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin(); + lit1 != this->m_att.multicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.multicastLocatorList.begin(); - lit2!= att.multicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.multicastLocatorList.begin(); + lit2 != att.multicastLocatorList.end(); ++lit2) { - if(*lit1 == *lit2) + if (*lit1 == *lit2) { missing = false; break; } } - if(missing) + if (missing) { - logWarning(PUBLISHER,"Locator: "<< *lit1<< " not present in new list"); - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); } } } } //TOPIC ATTRIBUTES - if(this->m_att.topic != att.topic) + if (this->m_att.topic != att.topic) { - logWarning(PUBLISHER,"Topic Attributes cannot be updated"); + logWarning(PUBLISHER, "Topic Attributes cannot be updated"); updated &= false; } //QOS: //CHECK IF THE QOS CAN BE SET - if(!this->m_att.qos.canQosBeUpdated(att.qos)) + if (!this->m_att.qos.canQosBeUpdated(att.qos)) { - updated &=false; + updated &= false; } - if(updated) + if (updated) { - if(this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) + if (this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) { //UPDATE TIMES: StatefulWriter* sfw = (StatefulWriter*)mp_writer; sfw->updateTimes(att.times); } - this->m_att.qos.setQos(att.qos,false); + this->m_att.qos.setQos(att.qos, false); this->m_att = att; //Notify the participant that a Writer has changed its QOS mp_rtpsParticipant->updateWriter(this->mp_writer, m_att.topic, m_att.qos); @@ -351,7 +349,7 @@ bool PublisherImpl::updateAttributes(const PublisherAttributes& att) if (m_att.qos.m_deadline.period != c_TimeInfinite) { deadline_duration_us_ = - duration>(m_att.qos.m_deadline.period.to_ns() * 1e-3); + duration >(m_att.qos.m_deadline.period.to_ns() * 1e-3); deadline_timer_->update_interval_millisec(m_att.qos.m_deadline.period.to_ns() * 1e-6); } else @@ -364,7 +362,7 @@ bool PublisherImpl::updateAttributes(const PublisherAttributes& att) if (m_att.qos.m_lifespan.duration != c_TimeInfinite) { lifespan_duration_us_ = - duration>(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); + duration >(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6); } else @@ -380,7 +378,7 @@ void PublisherImpl::PublisherWriterListener::onWriterMatched( RTPSWriter* /*writer*/, MatchingInfo& info) { - if( mp_publisherImpl->mp_listener != nullptr ) + if ( mp_publisherImpl->mp_listener != nullptr ) { mp_publisherImpl->mp_listener->onPublicationMatched(mp_publisherImpl->mp_userPublisher, info); } @@ -397,20 +395,21 @@ void PublisherImpl::PublisherWriterListener::onWriterChangeReceivedByAll( } void PublisherImpl::PublisherWriterListener::on_liveliness_lost( - RTPSWriter *writer, - const LivelinessLostStatus &status) + RTPSWriter* writer, + const LivelinessLostStatus& status) { (void)writer; if (mp_publisherImpl->mp_listener != nullptr) { mp_publisherImpl->mp_listener->on_liveliness_lost( - mp_publisherImpl->mp_userPublisher, - status); + mp_publisherImpl->mp_userPublisher, + status); } } -bool PublisherImpl::wait_for_all_acked(const eprosima::fastrtps::Time_t& max_wait) +bool PublisherImpl::wait_for_all_acked( + const eprosima::fastrtps::Time_t& max_wait) { return mp_writer->wait_for_all_acked(max_wait); } @@ -457,7 +456,8 @@ bool PublisherImpl::deadline_missed() return deadline_timer_reschedule(); } -void PublisherImpl::get_offered_deadline_missed_status(OfferedDeadlineMissedStatus &status) +void PublisherImpl::get_offered_deadline_missed_status( + OfferedDeadlineMissedStatus& status) { std::unique_lock lock(mp_writer->getMutex()); @@ -506,7 +506,8 @@ bool PublisherImpl::lifespan_expired() return true; } -void PublisherImpl::get_liveliness_lost_status(LivelinessLostStatus &status) +void PublisherImpl::get_liveliness_lost_status( + LivelinessLostStatus& status) { std::unique_lock lock(mp_writer->getMutex()); diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 72c1e68827a..c9db4f553c1 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -225,13 +225,12 @@ void StatefulWriter::unsent_change_added_to_history( { RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); - if (change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([&](FragmentNumber_t fragment_number) + for (uint32_t frag = 1; frag <= n_fragments; frag++) { - if (group.add_data_frag(*change, fragment_number, - expectsInlineQos)) + if (group.add_data_frag(*change, frag, expectsInlineQos)) { for (ReaderProxy* it : matched_readers_) { @@ -239,18 +238,18 @@ void StatefulWriter::unsent_change_added_to_history( { bool allFragmentsSent = false; it->mark_fragment_as_sent_for_change( - change->sequenceNumber, - fragment_number, - allFragmentsSent); + change->sequenceNumber, + frag, + allFragmentsSent); } } } else { logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); + ", " << frag << ")"); } - }); + } } else { @@ -293,18 +292,17 @@ void StatefulWriter::unsent_change_added_to_history( RTPSMessageGroup group(mp_RTPSParticipant, this, it->message_sender(), max_blocking_time); - if (change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([&](FragmentNumber_t fragment_number) + for (uint32_t frag = 1; frag <= n_fragments; frag++) { - if (!group.add_data_frag(*change, fragment_number, - it->expects_inline_qos())) + if (!group.add_data_frag(*change, frag, it->expects_inline_qos())) { logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); + ", " << frag << ")"); } - }); + } } else { diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 4d34a7dea65..5bdcf4846b7 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -178,18 +178,17 @@ void StatelessWriter::unsent_change_added_to_history( { RTPSMessageGroup group(mp_RTPSParticipant, this, it, max_blocking_time); - if(change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([this, &group, change](FragmentNumber_t fragment_number) + for (uint32_t frag = 1; frag <= n_fragments; frag++) { - if (!group.add_data_frag(*change, fragment_number, - is_inline_qos_expected_)) + if (!group.add_data_frag(*change, frag, is_inline_qos_expected_)) { - logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); + logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber + << ", " << frag << ")"); } - }); + } } else { @@ -211,22 +210,21 @@ void StatelessWriter::unsent_change_added_to_history( } } - if(there_are_remote_readers_ || !fixed_locators_.empty()) + if (there_are_remote_readers_ || !fixed_locators_.empty()) { RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); - if(change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([this, &group, change](FragmentNumber_t fragment_number) + for (uint32_t frag = 1; frag <= n_fragments; frag++) { - if (!group.add_data_frag(*change, fragment_number, - is_inline_qos_expected_)) + if (!group.add_data_frag(*change, frag, is_inline_qos_expected_)) { - logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); + logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber + << ", " << frag << ")"); } - }); + } } else { @@ -359,84 +357,97 @@ void StatelessWriter::send_any_unsent_changes() //TODO(Mcc) Separate sending for asynchronous writers std::lock_guard guard(mp_mutex); - RTPSWriterCollector changesToSend; - - for (const ChangeForReader_t& unsentChange : unsent_changes_) + bool flow_controllers_limited = false; + while (!unsent_changes_.empty() && !flow_controllers_limited) { - CacheChange_t* cache_change = unsentChange.getChange(); - changesToSend.add_change(cache_change, nullptr, unsentChange.getUnsentFragments()); + RTPSWriterCollector changesToSend; - uint64_t sequence_number = cache_change->sequenceNumber.to64long(); - // Filter intraprocess unsent changes - if (sequence_number > last_intraprocess_sequence_number_) + for (const ChangeForReader_t& unsentChange : unsent_changes_) { - last_intraprocess_sequence_number_ = sequence_number; - for (ReaderLocator& it : matched_readers_) + CacheChange_t* cache_change = unsentChange.getChange(); + changesToSend.add_change(cache_change, nullptr, unsentChange.getUnsentFragments()); + + uint64_t sequence_number = cache_change->sequenceNumber.to64long(); + // Filter intraprocess unsent changes + if (sequence_number > last_intraprocess_sequence_number_) { - if (it.is_local_reader()) + last_intraprocess_sequence_number_ = sequence_number; + for (ReaderLocator& it : matched_readers_) { - intraprocess_delivery(cache_change, it); + if (it.is_local_reader()) + { + intraprocess_delivery(cache_change, it); + } } } } - } - if (there_are_remote_readers_ || !fixed_locators_.empty()) - { - // Clear through local controllers - for (auto& controller : flow_controllers_) + if (there_are_remote_readers_ || !fixed_locators_.empty()) { - (*controller)(changesToSend); - } + // Read number of changes to send to detect if flow controllers limited them + size_t n_items = changesToSend.size(); - // Clear through parent controllers - for (auto& controller : mp_RTPSParticipant->getFlowControllers()) - { - (*controller)(changesToSend); - } - - try - { - RTPSMessageGroup group(mp_RTPSParticipant, this, *this); + // Clear through local controllers + for (auto& controller : flow_controllers_) + { + (*controller)(changesToSend); + } - bool bHasListener = mp_listener != nullptr; - while (!changesToSend.empty()) + // Clear through parent controllers + for (auto& controller : mp_RTPSParticipant->getFlowControllers()) { - RTPSWriterCollector::Item changeToSend = changesToSend.pop(); + (*controller)(changesToSend); + } - // Remove the messages selected for sending from the original list, - // and update those that were fragmented with the new sent index - update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber); + flow_controllers_limited = n_items != changesToSend.size(); - // Notify the controllers - FlowController::NotifyControllersChangeSent(changeToSend.cacheChange); + try + { + RTPSMessageGroup group(mp_RTPSParticipant, this, *this); - if (changeToSend.fragmentNumber != 0) + bool bHasListener = mp_listener != nullptr; + while (!changesToSend.empty()) { - if (!group.add_data_frag(*changeToSend.cacheChange, changeToSend.fragmentNumber, - is_inline_qos_expected_)) + RTPSWriterCollector::Item changeToSend = changesToSend.pop(); + + // Remove the messages selected for sending from the original list, + // and update those that were fragmented with the new sent index + update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber); + + // Notify the controllers + FlowController::NotifyControllersChangeSent(changeToSend.cacheChange); + + if (changeToSend.fragmentNumber != 0) { - logError(RTPS_WRITER, "Error sending fragment (" << changeToSend.sequenceNumber << - ", " << changeToSend.fragmentNumber << ")"); + if (!group.add_data_frag(*changeToSend.cacheChange, changeToSend.fragmentNumber, + is_inline_qos_expected_)) + { + logError(RTPS_WRITER, "Error sending fragment (" << changeToSend.sequenceNumber << + ", " << changeToSend.fragmentNumber << ")"); + } } - } - else - { - if (!group.add_data(*changeToSend.cacheChange, is_inline_qos_expected_)) + else { - logError(RTPS_WRITER, "Error sending change " << changeToSend.sequenceNumber); + if (!group.add_data(*changeToSend.cacheChange, is_inline_qos_expected_)) + { + logError(RTPS_WRITER, "Error sending change " << changeToSend.sequenceNumber); + } } - } - if (bHasListener && is_acked_by_all(changeToSend.cacheChange)) - { - mp_listener->onWriterChangeReceivedByAll(this, changeToSend.cacheChange); + if (bHasListener && is_acked_by_all(changeToSend.cacheChange)) + { + mp_listener->onWriterChangeReceivedByAll(this, changeToSend.cacheChange); + } } } + catch (const RTPSMessageGroup::timeout&) + { + logError(RTPS_WRITER, "Max blocking time reached"); + } } - catch (const RTPSMessageGroup::timeout&) + else { - logError(RTPS_WRITER, "Max blocking time reached"); + unsent_changes_.clear(); } } diff --git a/test/blackbox/BlackboxTestsPubSubFragments.cpp b/test/blackbox/BlackboxTestsPubSubFragments.cpp index 0481de2ea2a..b52bc981464 100644 --- a/test/blackbox/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/BlackboxTestsPubSubFragments.cpp @@ -49,75 +49,123 @@ class PubSubFragments : public testing::TestWithParam } } -}; +protected: + + void do_fragment_test( + const std::string& topic_name, + std::list& data, + bool asynchronous, + bool reliable, + bool small_fragments) + { + PubSubReader reader(topic_name); + PubSubWriter writer(topic_name); -TEST_P(PubSubFragments, PubSubAsNonReliableData300kb) -{ - // Mutes an expected error - Log::SetErrorStringFilter(std::regex("^((?!Big data).)*$")); + reader + .socket_buffer_size(1048576) // accomodate large and fast fragments + .history_depth(static_cast(data.size())) + .reliability(reliable ? + eprosima::fastrtps::RELIABLE_RELIABILITY_QOS : + eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .init(); - PubSubReader reader(TEST_TOPIC_NAME); - PubSubWriter writer(TEST_TOPIC_NAME); + ASSERT_TRUE(reader.isInitialized()); - //Increasing reception buffer to accomodate large and fast fragments - reader.socket_buffer_size(1048576).init(); + if (small_fragments) + { + auto testTransport = std::make_shared(); + testTransport->sendBufferSize = 1024; + testTransport->maxMessageSize = 1024; + testTransport->receiveBufferSize = 65536; + writer.disable_builtin_transport(); + writer.add_user_transport_to_pparams(testTransport); + } - ASSERT_TRUE(reader.isInitialized()); + if (asynchronous) + { + writer.asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE); + } - writer.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS).init(); + writer + .history_depth(static_cast(data.size())) + .reliability(reliable ? + eprosima::fastrtps::RELIABLE_RELIABILITY_QOS : + eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .init(); - ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(writer.isInitialized()); - // Because its volatile the durability - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + reader.startReception(data); + // Send data + writer.send(data, reliable ? 0u : 10u); + ASSERT_TRUE(data.empty()); + + // Block reader until reception finished or timeout. + if (reliable) + { + reader.block_for_all(); + } + else + { + reader.block_for_at_least(2); + } + } +}; + +TEST_P(PubSubFragments, PubSubAsNonReliableData300kb) +{ auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, false, false); +} - reader.startReception(data); - // Send data - writer.send(data); - ASSERT_TRUE(data.empty()); - // Block reader until reception finished or timeout. - reader.block_for_at_least(2); +TEST_P(PubSubFragments, PubSubAsNonReliableData300kbSmallFragments) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, false, true); } TEST_P(PubSubFragments, PubSubAsReliableData300kb) { - // Mutes an expected error - Log::SetErrorStringFilter(std::regex("^((?!Big data).)*$")); - - PubSubReader reader(TEST_TOPIC_NAME); - PubSubWriter writer(TEST_TOPIC_NAME); - - reader.history_depth(10). - reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).init(); - - ASSERT_TRUE(reader.isInitialized()); - - writer.history_depth(10). - reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).init(); + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, true, false); +} - ASSERT_TRUE(writer.isInitialized()); +TEST_P(PubSubFragments, PubSubAsReliableData300kbSmallFragments) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, true, true); +} - // Because its volatile the durability - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); +TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, false, false); +} +TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kbSmallFragments) +{ auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, false, true); +} - reader.startReception(data); +TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kb) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, true, false); +} - // Send data - writer.send(data); - ASSERT_TRUE(data.empty()); - // Block reader until reception finished or timeout. - reader.block_for_all(); +TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kbSmallFragments) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, true, true); } -TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) +TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kbWithFlowControl) { PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter writer(TEST_TOPIC_NAME); @@ -154,7 +202,7 @@ TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) reader.block_for_at_least(2); } -TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kb) +TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kbWithFlowControl) { PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter writer(TEST_TOPIC_NAME);