diff --git a/include/fastdds/rtps/common/CDRMessage_t.h b/include/fastdds/rtps/common/CDRMessage_t.h index 9405506c86c..118820050d6 100644 --- a/include/fastdds/rtps/common/CDRMessage_t.h +++ b/include/fastdds/rtps/common/CDRMessage_t.h @@ -26,10 +26,9 @@ #include #include -namespace eprosima{ -namespace fastrtps{ -namespace rtps{ - +namespace eprosima { +namespace fastrtps { +namespace rtps { //!Max size of RTPS message in bytes. #define RTPSMESSAGE_DEFAULT_SIZE 10500 //max size of rtps message in bytes @@ -48,43 +47,44 @@ namespace rtps{ * @brief Structure CDRMessage_t, contains a serialized message. * @ingroup COMMON_MODULE */ -struct RTPS_DllAPI CDRMessage_t{ - //! Default constructor - CDRMessage_t():wraps(false){ - pos = 0; - length = 0; - buffer = (octet*) malloc(RTPSMESSAGE_DEFAULT_SIZE); - max_size = RTPSMESSAGE_DEFAULT_SIZE; - -#if __BIG_ENDIAN__ - msg_endian = BIGEND; -#else - msg_endian = LITTLEEND; -#endif +struct RTPS_DllAPI CDRMessage_t final +{ + // TODO(Miguel C): Deprecate when not used in mocks + CDRMessage_t() + : CDRMessage_t(RTPSMESSAGE_DEFAULT_SIZE) + { } ~CDRMessage_t() { - if(buffer != nullptr && !wraps) + if (buffer != nullptr && !wraps) + { free(buffer); + } } /** * Constructor with maximum size * @param size Maximum size */ - CDRMessage_t(uint32_t size) + explicit CDRMessage_t( + uint32_t size) { wraps = false; pos = 0; length = 0; - if(size != 0) + if (size != 0) + { buffer = (octet*)malloc(size); + } else + { buffer = nullptr; + } max_size = size; + reserved_size = size; #if __BIG_ENDIAN__ msg_endian = BIGEND; @@ -94,19 +94,23 @@ struct RTPS_DllAPI CDRMessage_t{ } /** - * Constructor to wrap a serialized payload - * @param payload Payload to wrap - */ - CDRMessage_t(const SerializedPayload_t& payload) : wraps(true) + * Constructor to wrap a serialized payload + * @param payload Payload to wrap + */ + explicit CDRMessage_t( + const SerializedPayload_t& payload) + : wraps(true) { msg_endian = payload.encapsulation == PL_CDR_BE ? BIGEND : LITTLEEND; pos = payload.pos; length = payload.length; buffer = payload.data; max_size = payload.max_size; + reserved_size = payload.max_size; } - CDRMessage_t(const CDRMessage_t& message) + CDRMessage_t( + const CDRMessage_t& message) { wraps = false; pos = 0; @@ -114,16 +118,20 @@ struct RTPS_DllAPI CDRMessage_t{ max_size = message.max_size; msg_endian = message.msg_endian; - if(max_size != 0) + reserved_size = max_size; + if (max_size != 0) { buffer = (octet*)malloc(max_size); memcpy(buffer, message.buffer, length); } else + { buffer = nullptr; + } } - CDRMessage_t(CDRMessage_t&& message) + CDRMessage_t( + CDRMessage_t&& message) { wraps = message.wraps; message.wraps = false; @@ -133,6 +141,8 @@ struct RTPS_DllAPI CDRMessage_t{ message.length = 0; max_size = message.max_size; message.max_size = 0; + reserved_size = message.reserved_size; + message.reserved_size = 0; msg_endian = message.msg_endian; #if __BIG_ENDIAN__ message.msg_endian = BIGEND; @@ -143,7 +153,8 @@ struct RTPS_DllAPI CDRMessage_t{ message.buffer = nullptr; } - CDRMessage_t& operator=(CDRMessage_t &&message) + CDRMessage_t& operator =( + CDRMessage_t&& message) { wraps = message.wraps; message.wraps = false; @@ -153,6 +164,8 @@ struct RTPS_DllAPI CDRMessage_t{ message.length = 0; max_size = message.max_size; message.max_size = 0; + reserved_size = message.reserved_size; + message.reserved_size = 0; msg_endian = message.msg_endian; #if __BIG_ENDIAN__ message.msg_endian = BIGEND; @@ -169,7 +182,7 @@ struct RTPS_DllAPI CDRMessage_t{ uint32_t size) { assert(wraps == false); - if (size > max_size) + if (size > reserved_size) { octet* new_buffer = (octet*) realloc(buffer, size); if (new_buffer == nullptr) @@ -179,9 +192,11 @@ struct RTPS_DllAPI CDRMessage_t{ else { buffer = new_buffer; - max_size = size; + reserved_size = size; } } + + max_size = size; } //!Pointer to the buffer where the data is stored. @@ -190,6 +205,8 @@ struct RTPS_DllAPI CDRMessage_t{ uint32_t pos; //!Max size of the message. uint32_t max_size; + //!Size allocated on buffer. May be higher than max_size. + uint32_t reserved_size; //!Current length of the message. uint32_t length; //!Endianness of the message. @@ -198,8 +215,9 @@ struct RTPS_DllAPI CDRMessage_t{ bool wraps; }; -} -} -} -#endif +} // namespace rtps +} // namespace fastrtps +} // namespace eprosima + +#endif /* DOXYGEN_SHOULD_SKIP_THIS_PUBLIC */ #endif /* _FASTDDS_RTPS_CDRMESSAGE_T_H_ */ diff --git a/include/fastdds/rtps/messages/RTPSMessageCreator.h b/include/fastdds/rtps/messages/RTPSMessageCreator.h index 11949ae1197..8ed9069e71f 100644 --- a/include/fastdds/rtps/messages/RTPSMessageCreator.h +++ b/include/fastdds/rtps/messages/RTPSMessageCreator.h @@ -51,15 +51,8 @@ class RTPSMessageCreator { public: - RTPSMessageCreator(); - virtual ~RTPSMessageCreator(); - - //! - CDRMessage_t rtpsmc_submsgElem; - //! - CDRMessage_t rtpsmc_submsgHeader; - - + RTPSMessageCreator() = delete; + ~RTPSMessageCreator() = delete; /** * Create a Header to the serialized message. diff --git a/include/fastdds/rtps/network/ReceiverResource.h b/include/fastdds/rtps/network/ReceiverResource.h index c04ba27705c..a5c5303c6b6 100644 --- a/include/fastdds/rtps/network/ReceiverResource.h +++ b/include/fastdds/rtps/network/ReceiverResource.h @@ -93,7 +93,6 @@ class ReceiverResource : public fastdds::rtps::TransportReceiverInterface std::mutex mtx; MessageReceiver* receiver; - CDRMessage_t msg; }; } // namespace rtps diff --git a/src/cpp/dds/topic/DataWriterImpl.cpp b/src/cpp/dds/topic/DataWriterImpl.cpp index 9335cba981a..6c6c2abee1f 100644 --- a/src/cpp/dds/topic/DataWriterImpl.cpp +++ b/src/cpp/dds/topic/DataWriterImpl.cpp @@ -63,11 +63,11 @@ DataWriterImpl::DataWriterImpl( , qos_(&qos == &DATAWRITER_QOS_DEFAULT ? publisher_->get_default_datawriter_qos() : qos) , history_(topic_att_, type_->m_typeSize #if HAVE_SECURITY - // In future v2 changepool is in writer, and writer set this value to cachechagepool. - + 20 /*SecureDataHeader*/ + 4 + ((2* 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/ - + 16 + 4 /*SecureDataTag*/ + // In future v2 changepool is in writer, and writer set this value to cachechagepool. + + 20 /*SecureDataHeader*/ + 4 + ((2 * 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/ + + 16 + 4 /*SecureDataTag*/ #endif - , memory_policy) + , memory_policy) //, history_(std::move(history)) , listener_(listen) #pragma warning (disable : 4355 ) @@ -80,24 +80,24 @@ DataWriterImpl::DataWriterImpl( , user_datawriter_(nullptr) { deadline_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(), - [&]() -> bool - { - return deadline_missed(); - }, - qos_.m_deadline.period.to_ns() * 1e-6); + [&]() -> bool + { + return deadline_missed(); + }, + qos_.m_deadline.period.to_ns() * 1e-6); lifespan_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(), - [&]() -> bool - { - return lifespan_expired(); - }, - qos_.m_lifespan.duration.to_ns() * 1e-6); + [&]() -> bool + { + return lifespan_expired(); + }, + qos_.m_lifespan.duration.to_ns() * 1e-6); RTPSWriter* writer = RTPSDomain::createRTPSWriter( - publisher_->rtps_participant(), - w_att_, - static_cast(&history_), - static_cast(&writer_listener_)); + publisher_->rtps_participant(), + w_att_, + static_cast(&history_), + static_cast(&writer_listener_)); if (writer == nullptr) { @@ -146,8 +146,8 @@ bool DataWriterImpl::write( } ReturnCode_t DataWriterImpl::write( - void* data, - const fastrtps::rtps::InstanceHandle_t& handle) + void* data, + const fastrtps::rtps::InstanceHandle_t& handle) { //TODO Review when HANDLE_NIL is implemented as this just checks if the handle is 0, // but it need to check if there is an existing entity with that handle @@ -197,7 +197,6 @@ bool DataWriterImpl::create_new_change( return create_new_change_with_params(changeKind, data, wparams); } - bool DataWriterImpl::check_new_change_preconditions( ChangeKind_t change_kind, void* data) @@ -210,12 +209,12 @@ bool DataWriterImpl::check_new_change_preconditions( } if (change_kind == NOT_ALIVE_UNREGISTERED - || change_kind == NOT_ALIVE_DISPOSED - || change_kind == NOT_ALIVE_DISPOSED_UNREGISTERED) + || change_kind == NOT_ALIVE_DISPOSED + || change_kind == NOT_ALIVE_DISPOSED_UNREGISTERED) { if (!type_->m_isGetKeyDefined) { - logError(PUBLISHER,"Topic is NO_KEY, operation not permitted"); + logError(PUBLISHER, "Topic is NO_KEY, operation not permitted"); return false; } } @@ -230,8 +229,8 @@ bool DataWriterImpl::perform_create_new_change( const InstanceHandle_t& handle) { // Block lowlevel writer - auto max_blocking_time = std::chrono::steady_clock::now() + - std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.m_reliability.max_blocking_time)); + auto max_blocking_time = steady_clock::now() + + microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.m_reliability.max_blocking_time)); #if HAVE_STRICT_REALTIME std::unique_lock lock(writer_->getMutex(), std::defer_lock); @@ -248,7 +247,7 @@ bool DataWriterImpl::perform_create_new_change( //If these two checks are correct, we asume the cachechange is valid and thwn we can write to it. if (!type_->serialize(data, &ch->serializedPayload)) { - logWarning(RTPS_WRITER,"RTPSWriter:Serialization returns false";); + logWarning(RTPS_WRITER, "RTPSWriter:Serialization returns false"; ); history_.release_Cache(ch); return false; } @@ -260,17 +259,18 @@ bool DataWriterImpl::perform_create_new_change( RTPSParticipant* part = publisher_->rtps_participant(); uint32_t max_data_size = writer_->getMaxDataSize(); uint32_t writer_throughput_controller_bytes = - writer_->calculateMaxDataSize(w_att_.throughputController.bytesPerPeriod); + writer_->calculateMaxDataSize(w_att_.throughputController.bytesPerPeriod); uint32_t participant_throughput_controller_bytes = - writer_->calculateMaxDataSize( - part-> getRTPSParticipantAttributes().throughputController.bytesPerPeriod); + writer_->calculateMaxDataSize( + part->getRTPSParticipantAttributes().throughputController.bytesPerPeriod); high_mark_for_frag_ = - max_data_size > writer_throughput_controller_bytes ? - writer_throughput_controller_bytes : - (max_data_size > participant_throughput_controller_bytes ? - participant_throughput_controller_bytes : - max_data_size); + max_data_size > writer_throughput_controller_bytes ? + writer_throughput_controller_bytes : + (max_data_size > participant_throughput_controller_bytes ? + participant_throughput_controller_bytes : + max_data_size); + high_mark_for_frag_ &= ~3; } uint32_t final_high_mark_for_frag = high_mark_for_frag_; @@ -325,7 +325,8 @@ bool DataWriterImpl::perform_create_new_change( if (qos_.m_lifespan.duration != c_TimeInfinite) { - lifespan_duration_us_ = std::chrono::duration>(qos_.m_lifespan.duration.to_ns() * 1e-3); + lifespan_duration_us_ = duration >( + qos_.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(qos_.m_lifespan.duration.to_ns() * 1e-6); } else @@ -357,7 +358,7 @@ bool DataWriterImpl::create_new_change_with_params( #if HAVE_SECURITY is_key_protected = writer_->getAttributes().security_attributes().is_key_protected; #endif - type_->getKey(data,&handle,is_key_protected); + type_->getKey(data, &handle, is_key_protected); } return perform_create_new_change(changeKind, data, wparams, handle); @@ -377,7 +378,6 @@ bool DataWriterImpl::create_new_change_with_params( return perform_create_new_change(changeKind, data, wparams, handle); } - bool DataWriterImpl::remove_min_seq_change() { return history_.removeMinChange(); @@ -412,17 +412,17 @@ bool DataWriterImpl::set_attributes( if (att.endpoint.unicastLocatorList.size() != w_att_.endpoint.unicastLocatorList.size() || att.endpoint.multicastLocatorList.size() != w_att_.endpoint.multicastLocatorList.size()) { - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); updated &= false; } else { - for(LocatorListConstIterator lit1 = w_att_.endpoint.unicastLocatorList.begin(); - lit1!=this->w_att_.endpoint.unicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = w_att_.endpoint.unicastLocatorList.begin(); + lit1 != this->w_att_.endpoint.unicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.endpoint.unicastLocatorList.begin(); - lit2!= att.endpoint.unicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.endpoint.unicastLocatorList.begin(); + lit2 != att.endpoint.unicastLocatorList.end(); ++lit2) { if (*lit1 == *lit2) { @@ -432,16 +432,16 @@ bool DataWriterImpl::set_attributes( } if (missing) { - logWarning(PUBLISHER,"Locator: "<< *lit1 << " not present in new list"); - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); } } - for(LocatorListConstIterator lit1 = this->w_att_.endpoint.multicastLocatorList.begin(); - lit1!=this->w_att_.endpoint.multicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = this->w_att_.endpoint.multicastLocatorList.begin(); + lit1 != this->w_att_.endpoint.multicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.endpoint.multicastLocatorList.begin(); - lit2!= att.endpoint.multicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.endpoint.multicastLocatorList.begin(); + lit2 != att.endpoint.multicastLocatorList.end(); ++lit2) { if (*lit1 == *lit2) { @@ -451,8 +451,8 @@ bool DataWriterImpl::set_attributes( } if (missing) { - logWarning(PUBLISHER,"Locator: "<< *lit1<< " not present in new list"); - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); } } } @@ -492,7 +492,7 @@ ReturnCode_t DataWriterImpl::set_qos( return ReturnCode_t::RETCODE_IMMUTABLE_POLICY; } - qos_.setQos(qos,false); + qos_.setQos(qos, false); //Notify the participant that a Writer has changed its QOS publisher_->rtps_participant()->updateWriter(writer_, topic_att_, qos_); //publisher_->update_writer(this, topic_att_, qos_); @@ -501,7 +501,7 @@ ReturnCode_t DataWriterImpl::set_qos( if (qos_.m_deadline.period != c_TimeInfinite) { deadline_duration_us_ = - duration>(qos_.m_deadline.period.to_ns() * 1e-3); + duration >(qos_.m_deadline.period.to_ns() * 1e-3); deadline_timer_->update_interval_millisec(qos_.m_deadline.period.to_ns() * 1e-6); } else @@ -513,7 +513,7 @@ ReturnCode_t DataWriterImpl::set_qos( if (qos_.m_lifespan.duration != c_TimeInfinite) { lifespan_duration_us_ = - duration>(qos_.m_lifespan.duration.to_ns() * 1e-3); + duration >(qos_.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(qos_.m_lifespan.duration.to_ns() * 1e-6); } else @@ -601,7 +601,7 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost( } ReturnCode_t DataWriterImpl::wait_for_acknowledgments( - const Duration_t &max_wait) + const Duration_t& max_wait) { if (writer_->wait_for_all_acked(max_wait)) { @@ -652,7 +652,7 @@ bool DataWriterImpl::deadline_missed() } ReturnCode_t DataWriterImpl::get_offered_deadline_missed_status( - OfferedDeadlineMissedStatus &status) + OfferedDeadlineMissedStatus& status) { std::unique_lock lock(writer_->getMutex()); @@ -718,9 +718,9 @@ ReturnCode_t DataWriterImpl::get_liveliness_lost_status( ReturnCode_t DataWriterImpl::assert_liveliness() { if (!publisher_->rtps_participant()->wlp()->assert_liveliness( - writer_->getGuid(), - writer_->get_liveliness_kind(), - writer_->get_liveliness_lease_duration())) + writer_->getGuid(), + writer_->get_liveliness_kind(), + writer_->get_liveliness_lease_duration())) { logError(DATAWRITER, "Could not assert liveliness of writer " << writer_->getGuid()); return ReturnCode_t::RETCODE_ERROR; diff --git a/src/cpp/fastrtps_deprecated/publisher/PublisherImpl.cpp b/src/cpp/fastrtps_deprecated/publisher/PublisherImpl.cpp index 5291f6ef556..4078f615031 100644 --- a/src/cpp/fastrtps_deprecated/publisher/PublisherImpl.cpp +++ b/src/cpp/fastrtps_deprecated/publisher/PublisherImpl.cpp @@ -43,10 +43,6 @@ using namespace eprosima::fastrtps; using namespace ::rtps; using namespace std::chrono; -using namespace std::chrono; - -using namespace std::chrono; - using eprosima::fastdds::dds::TopicDataType; PublisherImpl::PublisherImpl( @@ -60,10 +56,10 @@ PublisherImpl::PublisherImpl( , m_att(att) #pragma warning (disable : 4355 ) , m_history(att.topic, - pdatatype->m_typeSize + pdatatype->m_typeSize #if HAVE_SECURITY // In future v2 changepool is in writer, and writer set this value to cachechagepool. - + 20 /*SecureDataHeader*/ + 4 + ((2* 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/ + + 20 /*SecureDataHeader*/ + 4 + ((2 * 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/ + 16 + 4 /*SecureDataTag*/ #endif , att.historyMemoryPolicy) @@ -98,7 +94,7 @@ PublisherImpl::~PublisherImpl() delete(lifespan_timer_); delete(deadline_timer_); - if(mp_writer != nullptr) + if (mp_writer != nullptr) { logInfo(PUBLISHER, this->getGuid().entityId << " in topic: " << this->m_att.topic.topicName); } @@ -107,8 +103,6 @@ PublisherImpl::~PublisherImpl() delete(this->mp_userPublisher); } - - bool PublisherImpl::create_new_change( ChangeKind_t changeKind, void* data) @@ -130,79 +124,80 @@ bool PublisherImpl::create_new_change_with_params( return false; } - if(changeKind == NOT_ALIVE_UNREGISTERED || changeKind == NOT_ALIVE_DISPOSED || + if (changeKind == NOT_ALIVE_UNREGISTERED || changeKind == NOT_ALIVE_DISPOSED || changeKind == NOT_ALIVE_DISPOSED_UNREGISTERED) { - if(m_att.topic.topicKind == NO_KEY) + if (m_att.topic.topicKind == NO_KEY) { - logError(PUBLISHER,"Topic is NO_KEY, operation not permitted"); + logError(PUBLISHER, "Topic is NO_KEY, operation not permitted"); return false; } } InstanceHandle_t handle; - if(m_att.topic.topicKind == WITH_KEY) + if (m_att.topic.topicKind == WITH_KEY) { bool is_key_protected = false; #if HAVE_SECURITY is_key_protected = mp_writer->getAttributes().security_attributes().is_key_protected; #endif - mp_type->getKey(data,&handle,is_key_protected); + mp_type->getKey(data, &handle, is_key_protected); } // Block lowlevel writer - auto max_blocking_time = std::chrono::steady_clock::now() + - std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); + auto max_blocking_time = steady_clock::now() + + microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); #if HAVE_STRICT_REALTIME std::unique_lock lock(mp_writer->getMutex(), std::defer_lock); - if(lock.try_lock_until(max_blocking_time)) + if (lock.try_lock_until(max_blocking_time)) #else std::unique_lock lock(mp_writer->getMutex()); #endif { CacheChange_t* ch = mp_writer->new_change(mp_type->getSerializedSizeProvider(data), changeKind, handle); - if(ch != nullptr) + if (ch != nullptr) { - if(changeKind == ALIVE) + if (changeKind == ALIVE) { //If these two checks are correct, we asume the cachechange is valid and thwn we can write to it. - if(!mp_type->serialize(data, &ch->serializedPayload)) + if (!mp_type->serialize(data, &ch->serializedPayload)) { - logWarning(RTPS_WRITER,"RTPSWriter:Serialization returns false";); + logWarning(RTPS_WRITER, "RTPSWriter:Serialization returns false"; ); m_history.release_Cache(ch); return false; } } //TODO(Ricardo) This logic in a class. Then a user of rtps layer can use it. - if(high_mark_for_frag_ == 0) + if (high_mark_for_frag_ == 0) { uint32_t max_data_size = mp_writer->getMaxDataSize(); uint32_t writer_throughput_controller_bytes = - mp_writer->calculateMaxDataSize(m_att.throughputController.bytesPerPeriod); + mp_writer->calculateMaxDataSize(m_att.throughputController.bytesPerPeriod); uint32_t participant_throughput_controller_bytes = - mp_writer->calculateMaxDataSize( - mp_rtpsParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod); + mp_writer->calculateMaxDataSize( + mp_rtpsParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod); high_mark_for_frag_ = - max_data_size > writer_throughput_controller_bytes ? - writer_throughput_controller_bytes : - (max_data_size > participant_throughput_controller_bytes ? - participant_throughput_controller_bytes : - max_data_size); + max_data_size > writer_throughput_controller_bytes ? + writer_throughput_controller_bytes : + (max_data_size > participant_throughput_controller_bytes ? + participant_throughput_controller_bytes : + max_data_size); + high_mark_for_frag_ &= ~3; } uint32_t final_high_mark_for_frag = high_mark_for_frag_; // If needed inlineqos for related_sample_identity, then remove the inlinqos size from final fragment size. - if(wparams.related_sample_identity() != SampleIdentity::unknown()) + if (wparams.related_sample_identity() != SampleIdentity::unknown()) { final_high_mark_for_frag -= 32; } // If it is big data, fragment it. - if(ch->serializedPayload.length > final_high_mark_for_frag) + if (ch->serializedPayload.length > final_high_mark_for_frag) { /// Fragment the data. // Set the fragment size to the cachechange. @@ -211,7 +206,7 @@ bool PublisherImpl::create_new_change_with_params( } InstanceHandle_t change_handle = ch->instanceHandle; - if(!this->m_history.add_pub_change(ch, wparams, lock, max_blocking_time)) + if (!this->m_history.add_pub_change(ch, wparams, lock, max_blocking_time)) { m_history.release_Cache(ch); return false; @@ -240,7 +235,8 @@ bool PublisherImpl::create_new_change_with_params( if (m_att.qos.m_lifespan.duration != c_TimeInfinite) { - lifespan_duration_us_ = std::chrono::duration>(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); + lifespan_duration_us_ = duration >( + m_att.qos.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6); lifespan_timer_->restart_timer(); } @@ -252,13 +248,13 @@ bool PublisherImpl::create_new_change_with_params( return false; } - bool PublisherImpl::removeMinSeqChange() { return m_history.removeMinChange(); } -bool PublisherImpl::removeAllChange(size_t* removed) +bool PublisherImpl::removeAllChange( + size_t* removed) { return m_history.removeAllChange(removed); } @@ -267,84 +263,86 @@ const GUID_t& PublisherImpl::getGuid() { return mp_writer->getGuid(); } + // -bool PublisherImpl::updateAttributes(const PublisherAttributes& att) +bool PublisherImpl::updateAttributes( + const PublisherAttributes& att) { bool updated = true; bool missing = false; - if(this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) + if (this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) { - if(att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() || + if (att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() || att.multicastLocatorList.size() != this->m_att.multicastLocatorList.size()) { - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); updated &= false; } else { - for(LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin(); - lit1!=this->m_att.unicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin(); + lit1 != this->m_att.unicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.unicastLocatorList.begin(); - lit2!= att.unicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.unicastLocatorList.begin(); + lit2 != att.unicastLocatorList.end(); ++lit2) { - if(*lit1 == *lit2) + if (*lit1 == *lit2) { missing = false; break; } } - if(missing) + if (missing) { - logWarning(PUBLISHER,"Locator: "<< *lit1 << " not present in new list"); - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); } } - for(LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin(); - lit1!=this->m_att.multicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin(); + lit1 != this->m_att.multicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.multicastLocatorList.begin(); - lit2!= att.multicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.multicastLocatorList.begin(); + lit2 != att.multicastLocatorList.end(); ++lit2) { - if(*lit1 == *lit2) + if (*lit1 == *lit2) { missing = false; break; } } - if(missing) + if (missing) { - logWarning(PUBLISHER,"Locator: "<< *lit1<< " not present in new list"); - logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version"); + logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list"); + logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version"); } } } } //TOPIC ATTRIBUTES - if(this->m_att.topic != att.topic) + if (this->m_att.topic != att.topic) { - logWarning(PUBLISHER,"Topic Attributes cannot be updated"); + logWarning(PUBLISHER, "Topic Attributes cannot be updated"); updated &= false; } //QOS: //CHECK IF THE QOS CAN BE SET - if(!this->m_att.qos.canQosBeUpdated(att.qos)) + if (!this->m_att.qos.canQosBeUpdated(att.qos)) { - updated &=false; + updated &= false; } - if(updated) + if (updated) { - if(this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) + if (this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) { //UPDATE TIMES: StatefulWriter* sfw = (StatefulWriter*)mp_writer; sfw->updateTimes(att.times); } - this->m_att.qos.setQos(att.qos,false); + this->m_att.qos.setQos(att.qos, false); this->m_att = att; //Notify the participant that a Writer has changed its QOS mp_rtpsParticipant->updateWriter(this->mp_writer, m_att.topic, m_att.qos); @@ -354,7 +352,7 @@ bool PublisherImpl::updateAttributes(const PublisherAttributes& att) if (m_att.qos.m_deadline.period != c_TimeInfinite) { deadline_duration_us_ = - duration>(m_att.qos.m_deadline.period.to_ns() * 1e-3); + duration >(m_att.qos.m_deadline.period.to_ns() * 1e-3); deadline_timer_->update_interval_millisec(m_att.qos.m_deadline.period.to_ns() * 1e-6); } else @@ -367,7 +365,7 @@ bool PublisherImpl::updateAttributes(const PublisherAttributes& att) if (m_att.qos.m_lifespan.duration != c_TimeInfinite) { lifespan_duration_us_ = - duration>(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); + duration >(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6); } else @@ -383,7 +381,7 @@ void PublisherImpl::PublisherWriterListener::onWriterMatched( RTPSWriter* /*writer*/, MatchingInfo& info) { - if( mp_publisherImpl->mp_listener != nullptr ) + if ( mp_publisherImpl->mp_listener != nullptr ) { mp_publisherImpl->mp_listener->onPublicationMatched(mp_publisherImpl->mp_userPublisher, info); } @@ -400,20 +398,21 @@ void PublisherImpl::PublisherWriterListener::onWriterChangeReceivedByAll( } void PublisherImpl::PublisherWriterListener::on_liveliness_lost( - RTPSWriter *writer, - const LivelinessLostStatus &status) + RTPSWriter* writer, + const LivelinessLostStatus& status) { (void)writer; if (mp_publisherImpl->mp_listener != nullptr) { mp_publisherImpl->mp_listener->on_liveliness_lost( - mp_publisherImpl->mp_userPublisher, - status); + mp_publisherImpl->mp_userPublisher, + status); } } -bool PublisherImpl::wait_for_all_acked(const eprosima::fastrtps::Duration_t& max_wait) +bool PublisherImpl::wait_for_all_acked( + const eprosima::fastrtps::Duration_t& max_wait) { return mp_writer->wait_for_all_acked(max_wait); } @@ -460,7 +459,8 @@ bool PublisherImpl::deadline_missed() return deadline_timer_reschedule(); } -void PublisherImpl::get_offered_deadline_missed_status(OfferedDeadlineMissedStatus &status) +void PublisherImpl::get_offered_deadline_missed_status( + OfferedDeadlineMissedStatus& status) { std::unique_lock lock(mp_writer->getMutex()); @@ -509,7 +509,8 @@ bool PublisherImpl::lifespan_expired() return true; } -void PublisherImpl::get_liveliness_lost_status(LivelinessLostStatus &status) +void PublisherImpl::get_liveliness_lost_status( + LivelinessLostStatus& status) { std::unique_lock lock(mp_writer->getMutex()); diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index 694a1894fa9..a4e60de2c6a 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -993,7 +993,7 @@ void PDP::assert_remote_participant_liveliness( CDRMessage_t PDP::get_participant_proxy_data_serialized(Endianness_t endian) { std::lock_guard guardPDP(*this->mp_mutex); - CDRMessage_t cdr_msg; + CDRMessage_t cdr_msg(RTPSMESSAGE_DEFAULT_SIZE); cdr_msg.msg_endian = endian; if (!getLocalParticipantProxyData()->writeToCDRMessage(&cdr_msg, false)) diff --git a/src/cpp/rtps/messages/RTPSMessageCreator.cpp b/src/cpp/rtps/messages/RTPSMessageCreator.cpp index 51259a3ba79..9e8adba88fb 100644 --- a/src/cpp/rtps/messages/RTPSMessageCreator.cpp +++ b/src/cpp/rtps/messages/RTPSMessageCreator.cpp @@ -29,15 +29,6 @@ namespace eprosima { namespace fastrtps { namespace rtps { -RTPSMessageCreator::RTPSMessageCreator() -{ -} - -RTPSMessageCreator::~RTPSMessageCreator() { - logInfo(RTPS_CDR_MSG,"RTPSMessageCreator destructor"); -} - - bool RTPSMessageCreator::addHeader(CDRMessage_t*msg, const GuidPrefix_t& guidPrefix, const ProtocolVersion_t& version,const VendorId_t& vendorId) { diff --git a/src/cpp/rtps/network/ReceiverResource.cpp b/src/cpp/rtps/network/ReceiverResource.cpp index 781131f6bfe..bfb0dedaa8b 100644 --- a/src/cpp/rtps/network/ReceiverResource.cpp +++ b/src/cpp/rtps/network/ReceiverResource.cpp @@ -32,7 +32,6 @@ ReceiverResource::ReceiverResource(TransportInterface& transport, const Locator_ , mValid(false) , mtx() , receiver(nullptr) - , msg(0) { // Internal channel is opened and assigned to this resource. mValid = transport.OpenInputChannel(locator, this, max_size); @@ -55,7 +54,6 @@ ReceiverResource::ReceiverResource(ReceiverResource&& rValueResource) rValueResource.receiver = nullptr; mValid = rValueResource.mValid; rValueResource.mValid = false; - msg = std::move(rValueResource.msg); } bool ReceiverResource::SupportsLocator(const Locator_t& localLocator) @@ -91,10 +89,12 @@ void ReceiverResource::OnDataReceived(const octet * data, const uint32_t size, if (rcv != nullptr) { + CDRMessage_t msg(0); msg.wraps = true; msg.buffer = const_cast(data); msg.length = size; msg.max_size = size; + msg.reserved_size = size; // TODO: Should we unlock in case UnregisterReceiver is called from callback ? rcv->processCDRMsg(remoteLocator, &msg); diff --git a/src/cpp/rtps/transport/ChannelResource.cpp b/src/cpp/rtps/transport/ChannelResource.cpp index 9a35eb76abd..5aaf39c0ab1 100644 --- a/src/cpp/rtps/transport/ChannelResource.cpp +++ b/src/cpp/rtps/transport/ChannelResource.cpp @@ -22,7 +22,7 @@ namespace rtps { using Log = fastdds::dds::Log; ChannelResource::ChannelResource() - : message_buffer_() + : message_buffer_(RTPSMESSAGE_DEFAULT_SIZE) , alive_(true) { logInfo(RTPS_MSG_IN, "Created with CDRMessage of size: " << message_buffer_.max_size); diff --git a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp index 90713bb015d..a44c5fb9c9e 100644 --- a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp +++ b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp @@ -138,7 +138,7 @@ bool RTCPMessageManager::sendData( TCPHeader header; TCPControlMsgHeader ctrlHeader; - CDRMessage_t msg; + CDRMessage_t msg(this->mTransport->get_configuration()->max_message_size()); fastrtps::rtps::CDRMessage::initCDRMsg(&msg); const ResponseCode* code = (respCode != RETCODE_VOID) ? &respCode : nullptr; diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index d2dfb7a2830..36418bcb308 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -223,12 +223,12 @@ void StatefulWriter::unsent_change_added_to_history( { RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); - if (change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([&](FragmentNumber_t fragment_number) + for (uint32_t frag = 1; frag <= n_fragments; frag++) { - if (group.add_data_frag(*change, fragment_number, expectsInlineQos)) + if (group.add_data_frag(*change, frag, expectsInlineQos)) { for (ReaderProxy* it : matched_readers_) { @@ -237,7 +237,7 @@ void StatefulWriter::unsent_change_added_to_history( bool allFragmentsSent = false; it->mark_fragment_as_sent_for_change( change->sequenceNumber, - fragment_number, + frag, allFragmentsSent); } } @@ -245,9 +245,9 @@ void StatefulWriter::unsent_change_added_to_history( else { logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); + ", " << frag << ")"); } - }); + } } else { @@ -453,7 +453,6 @@ bool StatefulWriter::intraprocess_heartbeat( return returned_value; } - bool StatefulWriter::change_removed_by_history( CacheChange_t* a_change) { @@ -633,7 +632,7 @@ void StatefulWriter::send_any_unsent_changes() // Point to first change with sequence not less than next_all_acked_notify_sequence_ auto cit = std::lower_bound( - mp_history->changesBegin(), mp_history->changesEnd(), seq, + mp_history->changesBegin(), mp_history->changesEnd(), seq, []( const CacheChange_t* change, const SequenceNumber_t& seq) { @@ -826,7 +825,7 @@ void StatefulWriter::send_any_unsent_changes() else { logError(RTPS_WRITER, "Error sending fragment (" << changeToSend.sequenceNumber << - ", " << changeToSend.fragmentNumber << ")"); + ", " << changeToSend.fragmentNumber << ")"); } } else @@ -1085,7 +1084,7 @@ bool StatefulWriter::matched_reader_add( { mp_RTPSParticipant->async_thread().wake_up(this); } - else if(is_reliable) + else if (is_reliable) { // Send Gap gap_builder.flush(); diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index f720fe8cf88..28852b26dbf 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -178,18 +178,17 @@ void StatelessWriter::unsent_change_added_to_history( { RTPSMessageGroup group(mp_RTPSParticipant, this, it, max_blocking_time); - if(change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([this, &group, change](FragmentNumber_t fragment_number) + for (uint32_t frag = 1; frag <= n_fragments; frag++) { - if (!group.add_data_frag(*change, fragment_number, - is_inline_qos_expected_)) + if (!group.add_data_frag(*change, frag, is_inline_qos_expected_)) { logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); + ", " << frag << ")"); } - }); + } } else { @@ -211,22 +210,21 @@ void StatelessWriter::unsent_change_added_to_history( } } - if(there_are_remote_readers_ || !fixed_locators_.empty()) + if (there_are_remote_readers_ || !fixed_locators_.empty()) { RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); - if(change->getFragmentCount() > 0) + uint32_t n_fragments = change->getFragmentCount(); + if (n_fragments > 0) { - ChangeForReader_t change_for_reader(change); - change_for_reader.getUnsentFragments().for_each([this, &group, change](FragmentNumber_t fragment_number) + for (uint32_t frag = 1; frag <= n_fragments; frag++) { - if (!group.add_data_frag(*change, fragment_number, - is_inline_qos_expected_)) + if (!group.add_data_frag(*change, frag, is_inline_qos_expected_)) { logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << - ", " << fragment_number << ")"); + ", " << frag << ")"); } - }); + } } else { @@ -359,90 +357,99 @@ void StatelessWriter::send_any_unsent_changes() //TODO(Mcc) Separate sending for asynchronous writers std::lock_guard guard(mp_mutex); - RTPSWriterCollector changesToSend; - - for (const ChangeForReader_t& unsentChange : unsent_changes_) + bool flow_controllers_limited = false; + while (!unsent_changes_.empty() && !flow_controllers_limited) { - CacheChange_t* cache_change = unsentChange.getChange(); - changesToSend.add_change(cache_change, nullptr, unsentChange.getUnsentFragments()); + RTPSWriterCollector changesToSend; - uint64_t sequence_number = cache_change->sequenceNumber.to64long(); - // Filter intraprocess unsent changes - if (sequence_number > last_intraprocess_sequence_number_) + for (const ChangeForReader_t& unsentChange : unsent_changes_) { - last_intraprocess_sequence_number_ = sequence_number; - for (ReaderLocator& it : matched_readers_) + CacheChange_t* cache_change = unsentChange.getChange(); + changesToSend.add_change(cache_change, nullptr, unsentChange.getUnsentFragments()); + + uint64_t sequence_number = cache_change->sequenceNumber.to64long(); + // Filter intraprocess unsent changes + if (sequence_number > last_intraprocess_sequence_number_) { - if (it.is_local_reader()) + last_intraprocess_sequence_number_ = sequence_number; + for (ReaderLocator& it : matched_readers_) { - intraprocess_delivery(cache_change, it); + if (it.is_local_reader()) + { + intraprocess_delivery(cache_change, it); + } } } } - } - - if (there_are_remote_readers_ || !fixed_locators_.empty()) - { - // Clear through local controllers - for (auto& controller : flow_controllers_) - { - (*controller)(changesToSend); - } - // Clear through parent controllers - for (auto& controller : mp_RTPSParticipant->getFlowControllers()) + if (there_are_remote_readers_ || !fixed_locators_.empty()) { - (*controller)(changesToSend); - } + // Read number of changes to send to detect if flow controllers limited them + size_t n_items = changesToSend.size(); - try - { - RTPSMessageGroup group(mp_RTPSParticipant, this, *this); + // Clear through local controllers + for (auto& controller : flow_controllers_) + { + (*controller)(changesToSend); + } - bool bHasListener = mp_listener != nullptr; - while (!changesToSend.empty()) + // Clear through parent controllers + for (auto& controller : mp_RTPSParticipant->getFlowControllers()) { - RTPSWriterCollector::Item changeToSend = changesToSend.pop(); + (*controller)(changesToSend); + } - // Remove the messages selected for sending from the original list, - // and update those that were fragmented with the new sent index - update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber); + flow_controllers_limited = n_items != changesToSend.size(); - // Notify the controllers - FlowController::NotifyControllersChangeSent(changeToSend.cacheChange); + try + { + RTPSMessageGroup group(mp_RTPSParticipant, this, *this); - if (changeToSend.fragmentNumber != 0) + bool bHasListener = mp_listener != nullptr; + while (!changesToSend.empty()) { - if (!group.add_data_frag(*changeToSend.cacheChange, changeToSend.fragmentNumber, - is_inline_qos_expected_)) + RTPSWriterCollector::Item changeToSend = changesToSend.pop(); + + // Remove the messages selected for sending from the original list, + // and update those that were fragmented with the new sent index + update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber); + + // Notify the controllers + FlowController::NotifyControllersChangeSent(changeToSend.cacheChange); + + if (changeToSend.fragmentNumber != 0) { - logError(RTPS_WRITER, "Error sending fragment (" << changeToSend.sequenceNumber << - ", " << changeToSend.fragmentNumber << ")"); + if (!group.add_data_frag(*changeToSend.cacheChange, changeToSend.fragmentNumber, + is_inline_qos_expected_)) + { + logError(RTPS_WRITER, "Error sending fragment (" << changeToSend.sequenceNumber << + ", " << changeToSend.fragmentNumber << ")"); + } } - } - else - { - if (!group.add_data(*changeToSend.cacheChange, is_inline_qos_expected_)) + else { - logError(RTPS_WRITER, "Error sending change " << changeToSend.sequenceNumber); + if (!group.add_data(*changeToSend.cacheChange, is_inline_qos_expected_)) + { + logError(RTPS_WRITER, "Error sending change " << changeToSend.sequenceNumber); + } } - } - if (bHasListener && is_acked_by_all(changeToSend.cacheChange)) - { - mp_listener->onWriterChangeReceivedByAll(this, changeToSend.cacheChange); + if (bHasListener && is_acked_by_all(changeToSend.cacheChange)) + { + mp_listener->onWriterChangeReceivedByAll(this, changeToSend.cacheChange); + } } } + catch (const RTPSMessageGroup::timeout&) + { + logError(RTPS_WRITER, "Max blocking time reached"); + } } - catch (const RTPSMessageGroup::timeout&) + else { - logError(RTPS_WRITER, "Max blocking time reached"); + unsent_changes_.clear(); } } - else - { - unsent_changes_.clear(); - } logInfo(RTPS_WRITER, "Finish sending unsent changes"); } diff --git a/test/blackbox/BlackboxTestsPubSubFragments.cpp b/test/blackbox/BlackboxTestsPubSubFragments.cpp index 6e2ebc7988e..b52bc981464 100644 --- a/test/blackbox/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/BlackboxTestsPubSubFragments.cpp @@ -17,8 +17,6 @@ #include "PubSubReader.hpp" #include "PubSubWriter.hpp" -#include - #include #include #include @@ -51,69 +49,123 @@ class PubSubFragments : public testing::TestWithParam } } -}; +protected: -TEST_P(PubSubFragments, PubSubAsNonReliableData300kb) -{ - PubSubReader reader(TEST_TOPIC_NAME); - PubSubWriter writer(TEST_TOPIC_NAME); + void do_fragment_test( + const std::string& topic_name, + std::list& data, + bool asynchronous, + bool reliable, + bool small_fragments) + { + PubSubReader reader(topic_name); + PubSubWriter writer(topic_name); - //Increasing reception buffer to accomodate large and fast fragments - reader.socket_buffer_size(1048576).init(); + reader + .socket_buffer_size(1048576) // accomodate large and fast fragments + .history_depth(static_cast(data.size())) + .reliability(reliable ? + eprosima::fastrtps::RELIABLE_RELIABILITY_QOS : + eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .init(); - ASSERT_TRUE(reader.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); - writer.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS).init(); + if (small_fragments) + { + auto testTransport = std::make_shared(); + testTransport->sendBufferSize = 1024; + testTransport->maxMessageSize = 1024; + testTransport->receiveBufferSize = 65536; + writer.disable_builtin_transport(); + writer.add_user_transport_to_pparams(testTransport); + } - ASSERT_TRUE(writer.isInitialized()); + if (asynchronous) + { + writer.asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE); + } - // Because its volatile the durability - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); + writer + .history_depth(static_cast(data.size())) + .reliability(reliable ? + eprosima::fastrtps::RELIABLE_RELIABILITY_QOS : + eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + reader.startReception(data); + // Send data + writer.send(data, reliable ? 0u : 10u); + ASSERT_TRUE(data.empty()); + + // Block reader until reception finished or timeout. + if (reliable) + { + reader.block_for_all(); + } + else + { + reader.block_for_at_least(2); + } + } +}; +TEST_P(PubSubFragments, PubSubAsNonReliableData300kb) +{ auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, false, false); +} - reader.startReception(data); - // Send data - writer.send(data); - ASSERT_TRUE(data.empty()); - // Block reader until reception finished or timeout. - reader.block_for_at_least(2); +TEST_P(PubSubFragments, PubSubAsNonReliableData300kbSmallFragments) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, false, true); } TEST_P(PubSubFragments, PubSubAsReliableData300kb) { - PubSubReader reader(TEST_TOPIC_NAME); - PubSubWriter writer(TEST_TOPIC_NAME); - - reader.history_depth(10). - reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).init(); - - ASSERT_TRUE(reader.isInitialized()); - - writer.history_depth(10). - reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).init(); + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, true, false); +} - ASSERT_TRUE(writer.isInitialized()); +TEST_P(PubSubFragments, PubSubAsReliableData300kbSmallFragments) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, false, true, true); +} - // Because its volatile the durability - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); +TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, false, false); +} +TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kbSmallFragments) +{ auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, false, true); +} - reader.startReception(data); +TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kb) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, true, false); +} - // Send data - writer.send(data); - ASSERT_TRUE(data.empty()); - // Block reader until reception finished or timeout. - reader.block_for_all(); +TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kbSmallFragments) +{ + auto data = default_data300kb_data_generator(); + do_fragment_test(TEST_TOPIC_NAME, data, true, true, true); } -TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) +TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kbWithFlowControl) { PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter writer(TEST_TOPIC_NAME); @@ -150,7 +202,7 @@ TEST_P(PubSubFragments, AsyncPubSubAsNonReliableData300kb) reader.block_for_at_least(2); } -TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kb) +TEST_P(PubSubFragments, AsyncPubSubAsReliableData300kbWithFlowControl) { PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter writer(TEST_TOPIC_NAME); @@ -298,68 +350,6 @@ TEST(PubSubFragments, AsyncPubSubAsReliableData300kbInLossyConditionsSmallFragme testTransport->dropLogLength); } -// Test introduced to verify the fix of the bug (#7609 Do not reuse cache change if sample does not fit) -// detected in relase 1.9.4 -TEST(PubSubFragments, AsyncPubSubAsBestEffortAlternateSizeInLossyConditions) -{ - PubSubReader reader(TEST_TOPIC_NAME); - PubSubWriter writer(TEST_TOPIC_NAME); - - auto reader_transport = std::make_shared(); - reader_transport->interfaceWhiteList.push_back("127.0.0.1"); - - reader - .disable_builtin_transport() - .add_user_transport_to_pparams(reader_transport) - .history_depth(5) - .reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) - .mem_policy(eprosima::fastrtps::rtps::DYNAMIC_RESERVE_MEMORY_MODE) - .init(); - - ASSERT_TRUE(reader.isInitialized()); - - - // To simulate lossy conditions, we are going to remove the default - // bultin transport, and instead use a lossy shim layer variant. - auto testTransport = std::make_shared(); - testTransport->sendBufferSize = 65536; - testTransport->receiveBufferSize = 65536; - // We drop 50% of all data frags - testTransport->dropDataFragMessagesPercentage = 50; - testTransport->dropLogLength = 10; - // Only one interface in order to really drop 50% of packages!!! - testTransport->interfaceWhiteList.push_back("127.0.0.1"); - - writer - .disable_builtin_transport() - .add_user_transport_to_pparams(testTransport) - .history_depth(5) - .asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE) - .init(); - - ASSERT_TRUE(writer.isInitialized()); - - // Because its volatile the durability - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); - - auto data = default_data96kb_data300kb_data_generator(2); - - reader.startReception(data); - writer.send(data); - - // All data has 7 fragments so when 3 has been lost all data has been sent - // Wait until then - while(eprosima::fastrtps::rtps::test_UDPv4Transport::test_UDPv4Transport_DropLog.size() < 3) - { - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - - // A second should be enough time to assure all data has been received - std::this_thread::sleep_for(std::chrono::seconds(1)); -} - TEST(PubSubFragments, AsyncFragmentSizeTest) { // ThroghputController size large than maxMessageSize. @@ -456,6 +446,7 @@ TEST(PubSubFragments, AsyncFragmentSizeTest) } } + INSTANTIATE_TEST_CASE_P(PubSubFragments, PubSubFragments, testing::Values(false, true), @@ -466,4 +457,3 @@ INSTANTIATE_TEST_CASE_P(PubSubFragments, } return "NonIntraprocess"; }); - diff --git a/test/unittest/rtps/security/SecurityTests.hpp b/test/unittest/rtps/security/SecurityTests.hpp index 1e496e7bcb5..84b1ebf879d 100644 --- a/test/unittest/rtps/security/SecurityTests.hpp +++ b/test/unittest/rtps/security/SecurityTests.hpp @@ -124,7 +124,8 @@ class SecurityTest : public ::testing::Test crypto_plugin_(new MockCryptographyPlugin()), stateless_writer_(nullptr), stateless_reader_(nullptr), volatile_writer_(nullptr), volatile_reader_(nullptr), - manager_(&participant_), participant_data_(c_default_RTPSParticipantAllocationAttributes) {} + manager_(&participant_), participant_data_(c_default_RTPSParticipantAllocationAttributes), + default_cdr_message(RTPSMESSAGE_DEFAULT_SIZE){} ~SecurityTest() { diff --git a/test/unittest/security/authentication/AuthenticationPluginTests.hpp b/test/unittest/security/authentication/AuthenticationPluginTests.hpp index d790de93fe4..146be778366 100644 --- a/test/unittest/security/authentication/AuthenticationPluginTests.hpp +++ b/test/unittest/security/authentication/AuthenticationPluginTests.hpp @@ -195,7 +195,7 @@ TEST_F(AuthenticationPluginTest, handshake_process_ok) eprosima::fastrtps::rtps::security::HandshakeMessageToken *handshake_message = nullptr; eprosima::fastrtps::rtps::ParticipantProxyData participant_data1(eprosima::fastrtps::rtps::c_default_RTPSParticipantAllocationAttributes); participant_data1.m_guid = adjusted_participant_key1; - eprosima::fastrtps::rtps::CDRMessage_t auxMsg; + eprosima::fastrtps::rtps::CDRMessage_t auxMsg(RTPSMESSAGE_DEFAULT_SIZE); auxMsg.msg_endian = eprosima::fastrtps::rtps::BIGEND; ASSERT_TRUE(participant_data1.writeToCDRMessage(&auxMsg, false)); diff --git a/test/unittest/security/cryptography/CryptographyPluginTests.hpp b/test/unittest/security/cryptography/CryptographyPluginTests.hpp index a02051bc218..bb9588353f0 100644 --- a/test/unittest/security/cryptography/CryptographyPluginTests.hpp +++ b/test/unittest/security/cryptography/CryptographyPluginTests.hpp @@ -348,9 +348,9 @@ TEST_F(CryptographyPluginTest, transform_RTPSMessage) CryptoPlugin->keyexchange()->set_remote_participant_crypto_tokens(*ParticipantB,*ParticipantB_remote,ParticipantA_CryptoTokens,exception); //Perform sample message exchange - eprosima::fastrtps::rtps::CDRMessage_t plain_rtps_message; - eprosima::fastrtps::rtps::CDRMessage_t encoded_rtps_message; - eprosima::fastrtps::rtps::CDRMessage_t decoded_rtps_message; + eprosima::fastrtps::rtps::CDRMessage_t plain_rtps_message(RTPSMESSAGE_DEFAULT_SIZE); + eprosima::fastrtps::rtps::CDRMessage_t encoded_rtps_message(RTPSMESSAGE_DEFAULT_SIZE); + eprosima::fastrtps::rtps::CDRMessage_t decoded_rtps_message(RTPSMESSAGE_DEFAULT_SIZE); char message[] = "RPTSMessage"; //Length 11 memcpy(plain_rtps_message.buffer, message, 11); @@ -1374,9 +1374,9 @@ TEST_F(CryptographyPluginTest, transform_preprocess_secure_submessage) ASSERT_TRUE( P_B->Readers.size() == 1); //Perform sample message exchange - eprosima::fastrtps::rtps::CDRMessage_t plain_payload; - eprosima::fastrtps::rtps::CDRMessage_t encoded_datareader_payload; - eprosima::fastrtps::rtps::CDRMessage_t encoded_datawriter_payload; + eprosima::fastrtps::rtps::CDRMessage_t plain_payload(RTPSMESSAGE_DEFAULT_SIZE); + eprosima::fastrtps::rtps::CDRMessage_t encoded_datareader_payload(RTPSMESSAGE_DEFAULT_SIZE); + eprosima::fastrtps::rtps::CDRMessage_t encoded_datawriter_payload(RTPSMESSAGE_DEFAULT_SIZE); char message[] = "My goose is cooked"; //Length 18 memcpy(plain_payload.buffer, message, 18);