diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index ec60678f586..2b193eb1a42 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -56,6 +56,13 @@ namespace eprosima { namespace fastdds { namespace dds { +static bool qos_has_pull_mode_request( + const DataWriterQos& qos) +{ + auto push_mode = PropertyPolicyHelper::find_property(qos.properties(), "fastdds.push_mode"); + return (nullptr != push_mode) && ("false" == *push_mode); +} + class DataWriterImpl::LoanCollection { public: @@ -1335,6 +1342,20 @@ ReturnCode_t DataWriterImpl::check_qos( logError(RTPS_QOS_CHECK, "Unique network flows not supported on writers"); return ReturnCode_t::RETCODE_UNSUPPORTED; } + bool is_pull_mode = qos_has_pull_mode_request(qos); + if (is_pull_mode) + { + if (BEST_EFFORT_RELIABILITY_QOS == qos.reliability().kind) + { + logError(RTPS_QOS_CHECK, "BEST_EFFORT incompatible with pull mode"); + return ReturnCode_t::RETCODE_INCONSISTENT_POLICY; + } + if (c_TimeInfinite == qos.reliable_writer_qos().times.heartbeatPeriod) + { + logError(RTPS_QOS_CHECK, "Infinite heartbeat period incompatible with pull mode"); + return ReturnCode_t::RETCODE_INCONSISTENT_POLICY; + } + } if (qos.reliability().kind == BEST_EFFORT_RELIABILITY_QOS && qos.ownership().kind == EXCLUSIVE_OWNERSHIP_QOS) { logError(RTPS_QOS_CHECK, "BEST_EFFORT incompatible with EXCLUSIVE ownership"); diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 83f613e05db..99bfe1fcfc3 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -312,26 +312,35 @@ void StatefulWriter::init( { const RTPSParticipantAttributes& part_att = pimpl->getRTPSParticipantAttributes(); - periodic_hb_event_ = new TimedEvent(pimpl->getEventResource(), [&]() -> bool - { - return send_periodic_heartbeat(); - }, - TimeConv::Time_t2MilliSecondsDouble(m_times.heartbeatPeriod)); + auto push_mode = PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.push_mode"); + m_pushMode = !((nullptr != push_mode) && ("false" == *push_mode)); - nack_response_event_ = new TimedEvent(pimpl->getEventResource(), [&]() -> bool - { - perform_nack_response(); - return false; - }, - TimeConv::Time_t2MilliSecondsDouble(m_times.nackResponseDelay)); + periodic_hb_event_ = new TimedEvent( + pimpl->getEventResource(), + [&]() -> bool + { + return send_periodic_heartbeat(); + }, + TimeConv::Time_t2MilliSecondsDouble(m_times.heartbeatPeriod)); + + nack_response_event_ = new TimedEvent( + pimpl->getEventResource(), + [&]() -> bool + { + perform_nack_response(); + return false; + }, + TimeConv::Time_t2MilliSecondsDouble(m_times.nackResponseDelay)); if (disable_positive_acks_) { - ack_event_ = new TimedEvent(pimpl->getEventResource(), [&]() -> bool - { - return ack_timer_expired(); - }, - att.keep_duration.to_ns() * 1e-6); // in milliseconds + ack_event_ = new TimedEvent( + pimpl->getEventResource(), + [&]() -> bool + { + return ack_timer_expired(); + }, + att.keep_duration.to_ns() * 1e-6); // in milliseconds } for (size_t n = 0; n < att.matched_readers_allocation.initial; ++n) @@ -439,14 +448,17 @@ void StatefulWriter::async_delivery( CacheChange_t* change, const std::chrono::time_point& max_blocking_time) { + bool should_wake_up = false; + for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, - [this, &change, &max_blocking_time](ReaderProxy* reader) + [this, &should_wake_up, &change, &max_blocking_time](ReaderProxy* reader) { ChangeForReader_t changeForReader(change); - if (m_pushMode) + if (m_pushMode || !reader->is_reliable() || reader->is_local_reader()) { changeForReader.setStatus(UNSENT); + should_wake_up = true; } else { @@ -459,10 +471,14 @@ void StatefulWriter::async_delivery( } ); - if (m_pushMode) + if (should_wake_up) { mp_RTPSParticipant->async_thread().wake_up(this, max_blocking_time); } + else + { + periodic_hb_event_->restart_timer(max_blocking_time); + } } void StatefulWriter::sync_delivery( @@ -472,55 +488,87 @@ void StatefulWriter::sync_delivery( //TODO(Ricardo) Temporal. bool expectsInlineQos = false; + bool should_be_sent = false; + locator_selector_.reset(false); + // First step is to add the new CacheChange_t to all reader proxies. // It has to be done before sending, because if a timeout is caught, we will not include the // CacheChange_t in some reader proxies. for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, - [this, &change, &max_blocking_time, &expectsInlineQos](ReaderProxy* reader) + [this, &change, &max_blocking_time, &expectsInlineQos, &should_be_sent](ReaderProxy* reader) { ChangeForReader_t changeForReader(change); + bool send_to_this_reader = false; - if (m_pushMode) + if (!reader->is_reliable()) + { + changeForReader.setStatus(ACKNOWLEDGED); + send_to_this_reader = true; + } + else { - if (reader->is_reliable()) + if (m_pushMode || reader->is_local_reader()) { changeForReader.setStatus(UNDERWAY); + send_to_this_reader = true; } else { - changeForReader.setStatus(ACKNOWLEDGED); + changeForReader.setStatus(UNACKNOWLEDGED); } } - else - { - changeForReader.setStatus(UNACKNOWLEDGED); - } changeForReader.setRelevance(reader->rtps_is_relevant(change)); reader->add_change(changeForReader, true, max_blocking_time); expectsInlineQos |= reader->expects_inline_qos(); + if (send_to_this_reader) + { + if (reader->is_local_reader()) + { + intraprocess_heartbeat(reader, false); + bool delivered = !changeForReader.isRelevant() || intraprocess_delivery(change, reader); + reader->set_change_to_status( + change->sequenceNumber, + delivered ? ACKNOWLEDGED : UNDERWAY, + false); + } + else if (reader->is_datasharing_reader()) + { + reader->datasharing_notify(); + reader->set_change_to_status(change->sequenceNumber, UNDERWAY, false); + } + else + { + should_be_sent = true; + locator_selector_.enable(reader->guid()); + } + } + return false; } ); - // Notify the datasharing readers - for (ReaderProxy* reader : matched_datasharing_readers_) + //At this point we are sure all information was stored. We now can send data. + if (!should_be_sent) { - reader->datasharing_notify(); - reader->set_change_to_status(change->sequenceNumber, UNDERWAY, false); + if (getMatchedReadersSize() > 0) + { + periodic_hb_event_->restart_timer(max_blocking_time); + } } - - if (!matched_remote_readers_.empty() || !matched_local_readers_.empty()) + else { - try { - //At this point we are sure all information was stored. We now can send data. if (!m_separateSendingEnabled) { if (locator_selector_.selected_size() > 0) { + NetworkFactory& network = mp_RTPSParticipant->network_factory(); + network.select_locators(locator_selector_); + compute_selected_guids(); + RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); auto sent_fun = [this, change]( @@ -542,28 +590,9 @@ void StatefulWriter::sync_delivery( send_data_or_fragments(group, change, expectsInlineQos, sent_fun); send_heartbeat_nts_(all_remote_readers_.size(), group, disable_positive_acks_); } - - for (ReaderProxy* it : matched_local_readers_) - { - intraprocess_heartbeat(it, false); - bool delivered = intraprocess_delivery(change, it); - it->set_change_to_status( - change->sequenceNumber, - delivered ? ACKNOWLEDGED : UNDERWAY, - false); - } } else { - for (ReaderProxy* it : matched_local_readers_) - { - intraprocess_heartbeat(it, false); - bool delivered = intraprocess_delivery(change, it); - it->set_change_to_status( - change->sequenceNumber, - delivered ? ACKNOWLEDGED : UNDERWAY, - false); - } for (ReaderProxy* it : matched_remote_readers_) { RTPSMessageGroup group(mp_RTPSParticipant, this, it->message_sender(), @@ -585,10 +614,7 @@ void StatefulWriter::sync_delivery( } } - if (there_are_remote_readers_) - { - periodic_hb_event_->restart_timer(max_blocking_time); - } + periodic_hb_event_->restart_timer(max_blocking_time); if (disable_positive_acks_ && last_sequence_number_ == SequenceNumber_t()) { @@ -601,6 +627,14 @@ void StatefulWriter::sync_delivery( } } + if (!m_pushMode) + { + NetworkFactory& network = mp_RTPSParticipant->network_factory(); + locator_selector_.reset(true); + network.select_locators(locator_selector_); + compute_selected_guids(); + } + check_acked_status(); } @@ -775,7 +809,7 @@ void StatefulWriter::send_any_unsent_changes() bool activateHeartbeatPeriod = false; SequenceNumber_t max_sequence = mp_history->next_sequence_number(); - if (!m_pushMode || mp_history->getHistorySize() == 0 || getMatchedReadersSize() == 0) + if (mp_history->getHistorySize() == 0 || getMatchedReadersSize() == 0) { send_heartbeat_to_all_readers(); } @@ -809,20 +843,11 @@ void StatefulWriter::send_any_unsent_changes() void StatefulWriter::send_heartbeat_to_all_readers() { - // This version is called when any of the following conditions is satisfied: - // a) push mode is false - // b) history is empty - // c) there are no matched readers - - for (ReaderProxy* reader : matched_local_readers_) - { - intraprocess_heartbeat(reader); - } + // This version is called from send_any_unsent_changes when any of the following conditions is satisfied: + // a) history is empty + // b) there are no matched readers - for (ReaderProxy* reader : matched_datasharing_readers_) - { - reader->datasharing_notify(); - } + // It may also be called from send_periodic_heartbeat if (m_separateSendingEnabled) { @@ -833,6 +858,16 @@ void StatefulWriter::send_heartbeat_to_all_readers() } else { + for (ReaderProxy* reader : matched_local_readers_) + { + intraprocess_heartbeat(reader); + } + + for (ReaderProxy* reader : matched_datasharing_readers_) + { + reader->datasharing_notify(); + } + if (there_are_remote_readers_) { RTPSMessageGroup group(mp_RTPSParticipant, this, *this); @@ -846,10 +881,9 @@ void StatefulWriter::send_changes_separatedly( bool& activateHeartbeatPeriod) { // This version is called when all of the following conditions are satisfied: - // a) push mode is true - // b) history is not empty - // c) there is at least one matched reader - // d) separate sending is enabled + // a) history is not empty + // b) there is at least one matched reader + // c) separate sending is enabled // Process datasharing then if (there_are_datasharing_readers_) @@ -1061,11 +1095,10 @@ void StatefulWriter::send_all_unsent_changes( bool& activateHeartbeatPeriod) { // This version is called when all of the following conditions are satisfied: - // a) push mode is true - // b) history is not empty - // c) there is at least one matched reader - // d) separate sending is disabled - // e) either all matched readers are local or no flow controllers are configured + // a) history is not empty + // b) there is at least one matched reader + // c) separate sending is disabled + // d) either all matched readers are local or no flow controllers are configured // Process intraprocess first if (there_are_local_readers_) @@ -1194,11 +1227,10 @@ void StatefulWriter::send_unsent_changes_with_flow_control( bool& activateHeartbeatPeriod) { // This version is called when all of the following conditions are satisfied: - // a) push mode is true - // b) history is not empty - // c) there is at least one matched reader - // d) separate sending is disabled - // e) there is at least one remote matched reader and flow controllers are configured + // a) history is not empty + // b) there is at least one matched reader + // c) separate sending is disabled + // d) there is at least one remote matched reader and flow controllers are configured // Process intraprocess first if (there_are_local_readers_) @@ -2144,32 +2176,7 @@ bool StatefulWriter::send_periodic_heartbeat( std::lock_guard guardW(mp_mutex); bool unacked_changes = false; - if (m_separateSendingEnabled) - { - if (liveliness) - { - for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, - [this, &liveliness, &unacked_changes](ReaderProxy* reader) - { - send_heartbeat_to_nts(*reader, liveliness); - unacked_changes = true; - return false; - } - ); - } - else - { - for (ReaderProxy* reader : matched_remote_readers_) - { - if (reader->has_unacknowledged()) - { - send_heartbeat_to_nts(*reader, liveliness); - unacked_changes = true; - } - } - } - } - else if (!liveliness) + if (!liveliness) { SequenceNumber_t firstSeq, lastSeq; @@ -2196,8 +2203,7 @@ bool StatefulWriter::send_periodic_heartbeat( { try { - RTPSMessageGroup group(mp_RTPSParticipant, this, *this); - send_heartbeat_nts_(all_remote_readers_.size(), group, disable_positive_acks_, liveliness); + send_heartbeat_to_all_readers(); } catch (const RTPSMessageGroup::timeout&) { @@ -2206,6 +2212,17 @@ bool StatefulWriter::send_periodic_heartbeat( } } } + else if (m_separateSendingEnabled) + { + for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, + [this, &liveliness, &unacked_changes](ReaderProxy* reader) + { + send_heartbeat_to_nts(*reader, liveliness); + unacked_changes = true; + return false; + } + ); + } else { // This is a liveliness heartbeat, we don't care about checking sequence numbers @@ -2214,6 +2231,7 @@ bool StatefulWriter::send_periodic_heartbeat( for (ReaderProxy* reader : matched_local_readers_) { intraprocess_heartbeat(reader, true); + unacked_changes = true; } for (ReaderProxy* reader : matched_datasharing_readers_) @@ -2222,10 +2240,15 @@ bool StatefulWriter::send_periodic_heartbeat( assert(p); p->assert_liveliness(); reader->datasharing_notify(); + unacked_changes = true; } - RTPSMessageGroup group(mp_RTPSParticipant, this, *this); - send_heartbeat_nts_(all_remote_readers_.size(), group, final, liveliness); + if (there_are_remote_readers_) + { + unacked_changes = true; + RTPSMessageGroup group(mp_RTPSParticipant, this, *this); + send_heartbeat_nts_(all_remote_readers_.size(), group, final, liveliness); + } } catch (const RTPSMessageGroup::timeout&) { @@ -2241,26 +2264,37 @@ void StatefulWriter::send_heartbeat_to_nts( bool liveliness, bool force /* = false */) { - if (remoteReaderProxy.is_remote_and_reliable() && (force || liveliness || remoteReaderProxy.has_unacknowledged())) + if (remoteReaderProxy.is_reliable() && (force || liveliness || remoteReaderProxy.has_unacknowledged())) { - try + if (remoteReaderProxy.is_local_reader()) + { + intraprocess_heartbeat(&remoteReaderProxy, liveliness); + } + else if (remoteReaderProxy.is_datasharing_reader()) { - RTPSMessageGroup group(mp_RTPSParticipant, this, remoteReaderProxy.message_sender()); - send_heartbeat_nts_(1u, group, disable_positive_acks_, liveliness); - SequenceNumber_t first_seq = get_seq_num_min(); - if (first_seq != c_SequenceNumber_Unknown) + remoteReaderProxy.datasharing_notify(); + } + else + { + try { - SequenceNumber_t first_relevant = remoteReaderProxy.first_relevant_sequence_number(); - if (remoteReaderProxy.durability_kind() == VOLATILE && first_seq < first_relevant) + RTPSMessageGroup group(mp_RTPSParticipant, this, remoteReaderProxy.message_sender()); + send_heartbeat_nts_(1u, group, disable_positive_acks_, liveliness); + SequenceNumber_t first_seq = get_seq_num_min(); + if (first_seq != c_SequenceNumber_Unknown) { - group.add_gap(first_seq, SequenceNumberSet_t(first_relevant)); + SequenceNumber_t first_relevant = remoteReaderProxy.first_relevant_sequence_number(); + if (remoteReaderProxy.durability_kind() == VOLATILE && first_seq < first_relevant) + { + group.add_gap(first_seq, SequenceNumberSet_t(first_relevant)); + } + remoteReaderProxy.send_gaps(group, mp_history->next_sequence_number()); } - remoteReaderProxy.send_gaps(group, mp_history->next_sequence_number()); } - } - catch (const RTPSMessageGroup::timeout&) - { - logError(RTPS_WRITER, "Max blocking time reached"); + catch (const RTPSMessageGroup::timeout&) + { + logError(RTPS_WRITER, "Max blocking time reached"); + } } } } diff --git a/test/blackbox/CMakeLists.txt b/test/blackbox/CMakeLists.txt index f740b31f4dd..4db3ad1c5c9 100644 --- a/test/blackbox/CMakeLists.txt +++ b/test/blackbox/CMakeLists.txt @@ -75,7 +75,7 @@ macro(add_blackbox_gtest) list(GET GTEST_TEST_NAME 3 GTEST_TEST_NAME) add_test(NAME ${test}.${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}.Transport - COMMAND ${command} --gtest_filter=*/${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}/Transport) + COMMAND ${command} --gtest_filter=*/${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}/Transport*) # Add environment if(WIN32) @@ -90,7 +90,7 @@ macro(add_blackbox_gtest) set_property(TEST ${test}.${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}.Transport PROPERTY LABELS "${GTEST_LABELS}") add_test(NAME ${test}.${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}.Intraprocess - COMMAND ${command} --gtest_filter=*/${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}/Intraprocess) + COMMAND ${command} --gtest_filter=*/${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}/Intraprocess*) # Add environment if(WIN32) @@ -106,7 +106,7 @@ macro(add_blackbox_gtest) if(${test} MATCHES ".*_DDS_PIM$") add_test(NAME ${test}.${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}.Datasharing - COMMAND ${command} --gtest_filter=*/${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}/Datasharing) + COMMAND ${command} --gtest_filter=*/${GTEST_GROUP_NAME}.${GTEST_TEST_NAME}/Datasharing*) # Add environment if(WIN32) diff --git a/test/blackbox/api/dds-pim/PubSubParticipant.hpp b/test/blackbox/api/dds-pim/PubSubParticipant.hpp index 65d10455274..6e01cc24921 100644 --- a/test/blackbox/api/dds-pim/PubSubParticipant.hpp +++ b/test/blackbox/api/dds-pim/PubSubParticipant.hpp @@ -166,7 +166,6 @@ class PubSubParticipant , sub_times_liveliness_lost_(0) , sub_times_liveliness_recovered_(0) { - if (enable_datasharing) { datareader_qos_.data_sharing().automatic(); @@ -178,6 +177,11 @@ class PubSubParticipant datawriter_qos_.data_sharing().off(); } + if (use_pull_mode) + { + datawriter_qos_.properties().properties().emplace_back("fastdds.push_mode", "false"); + } + #if defined(PREALLOCATED_WITH_REALLOC_MEMORY_MODE_TEST) datawriter_qos_.historyMemoryPolicy = rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; #elif defined(DYNAMIC_RESERVE_MEMORY_MODE_TEST) diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 351090ead1d..6eebeb0b3f9 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -277,6 +277,11 @@ class PubSubWriter datawriter_qos_.data_sharing().off(); } + if (use_pull_mode) + { + datawriter_qos_.properties().properties().emplace_back("fastdds.push_mode", "false"); + } + // By default, memory mode is preallocated (the most restritive) datawriter_qos_.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_MEMORY_MODE; diff --git a/test/blackbox/api/dds-pim/PubSubWriterReader.hpp b/test/blackbox/api/dds-pim/PubSubWriterReader.hpp index bbb1fe6fab0..5924278e105 100644 --- a/test/blackbox/api/dds-pim/PubSubWriterReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriterReader.hpp @@ -336,6 +336,11 @@ class PubSubWriterReader datawriter_qos_.data_sharing().off(); } + if (use_pull_mode) + { + datawriter_qos_.properties().properties().emplace_back("fastdds.push_mode", "false"); + } + // By default, memory mode is preallocated (the most restritive) datawriter_qos_.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_MEMORY_MODE; datareader_qos_.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_MEMORY_MODE; diff --git a/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp b/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp index 80cb3b97f58..2618cbd3b90 100644 --- a/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp +++ b/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp @@ -18,6 +18,7 @@ */ #include "ReqRepHelloWorldReplier.hpp" +#include "../../common/BlackboxTests.hpp" #include #include @@ -49,6 +50,9 @@ ReqRepHelloWorldReplier::ReqRepHelloWorldReplier() // By default, memory mode is preallocated (the most restritive) datareader_qos_.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_MEMORY_MODE; datawriter_qos_.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_MEMORY_MODE; + + datawriter_qos_.reliable_writer_qos().times.heartbeatPeriod.seconds = 1; + datawriter_qos_.reliable_writer_qos().times.heartbeatPeriod.nanosec = 0; } ReqRepHelloWorldReplier::~ReqRepHelloWorldReplier() @@ -115,6 +119,22 @@ void ReqRepHelloWorldReplier::init() ASSERT_NE(reply_publisher_, nullptr); ASSERT_TRUE(reply_publisher_->is_enabled()); + if (enable_datasharing) + { + datareader_qos_.data_sharing().automatic(); + datawriter_qos_.data_sharing().automatic(); + } + else + { + datareader_qos_.data_sharing().off(); + datawriter_qos_.data_sharing().off(); + } + + if (use_pull_mode) + { + datawriter_qos_.properties().properties().emplace_back("fastdds.push_mode", "false"); + } + //Create datareader request_datareader_ = request_subscriber_->create_datareader(request_topic_, datareader_qos_, &request_listener_); diff --git a/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp b/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp index fa21c107930..903390a1415 100644 --- a/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp +++ b/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp @@ -18,6 +18,7 @@ */ #include "ReqRepHelloWorldRequester.hpp" +#include "../../common/BlackboxTests.hpp" #include #include @@ -48,6 +49,9 @@ ReqRepHelloWorldRequester::ReqRepHelloWorldRequester() // By default, memory mode is preallocated (the most restritive) datareader_qos_.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_MEMORY_MODE; datawriter_qos_.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_MEMORY_MODE; + + datawriter_qos_.reliable_writer_qos().times.heartbeatPeriod.seconds = 1; + datawriter_qos_.reliable_writer_qos().times.heartbeatPeriod.nanosec = 0; } ReqRepHelloWorldRequester::~ReqRepHelloWorldRequester() @@ -113,6 +117,22 @@ void ReqRepHelloWorldRequester::init() ASSERT_NE(request_topic_, nullptr); ASSERT_TRUE(request_topic_->is_enabled()); + if (enable_datasharing) + { + datareader_qos_.data_sharing().automatic(); + datawriter_qos_.data_sharing().automatic(); + } + else + { + datareader_qos_.data_sharing().off(); + datawriter_qos_.data_sharing().off(); + } + + if (use_pull_mode) + { + datawriter_qos_.properties().properties().emplace_back("fastdds.push_mode", "false"); + } + //Create DataReader reply_datareader_ = reply_subscriber_->create_datareader(reply_topic_, datareader_qos_, &reply_listener_); ASSERT_NE(reply_datareader_, nullptr); diff --git a/test/blackbox/common/BlackboxTests.cpp b/test/blackbox/common/BlackboxTests.cpp index 6cf73e63002..0437b39ce4f 100644 --- a/test/blackbox/common/BlackboxTests.cpp +++ b/test/blackbox/common/BlackboxTests.cpp @@ -32,6 +32,7 @@ using namespace eprosima::fastrtps::rtps; uint16_t global_port = 0; bool enable_datasharing; +bool use_pull_mode; uint16_t get_port() { @@ -64,6 +65,7 @@ class BlackboxEnvironment : public ::testing::Environment att.intraprocess_delivery = INTRAPROCESS_OFF; eprosima::fastrtps::xmlparser::XMLProfileManager::library_settings(att); enable_datasharing = false; + use_pull_mode = false; //Log::SetVerbosity(eprosima::fastdds::dds::Log::Info); //Log::SetCategoryFilter(std::regex("(SECURITY)")); diff --git a/test/blackbox/common/BlackboxTests.hpp b/test/blackbox/common/BlackboxTests.hpp index ab8a1fc846e..f46b9d6ac1e 100644 --- a/test/blackbox/common/BlackboxTests.hpp +++ b/test/blackbox/common/BlackboxTests.hpp @@ -50,6 +50,7 @@ extern void tls_init(); extern uint16_t global_port; extern bool enable_datasharing; +extern bool use_pull_mode; /****** Auxiliary print functions ******/ template diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 2c642d63a23..23b2369ec81 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -460,6 +460,7 @@ TEST(Discovery, EndpointRediscoveryWithTransientLocalData) writer .lease_duration({ 2, 0 }, { 1, 0 }) + .history_depth(10) .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) .init(); @@ -610,7 +611,7 @@ TEST(Discovery, LocalInitialPeers) writer_default_unicast_locator.push_back(loc_default_unicast); writer.metatraffic_unicast_locator_list(writer_default_unicast_locator). - initial_peers(writer_initial_peers).init(); + initial_peers(writer_initial_peers).history_depth(10).init(); ASSERT_TRUE(writer.isInitialized()); diff --git a/test/blackbox/common/BlackboxTestsPubSubBasic.cpp b/test/blackbox/common/BlackboxTestsPubSubBasic.cpp index 235708fd025..e97264be672 100644 --- a/test/blackbox/common/BlackboxTestsPubSubBasic.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubBasic.cpp @@ -23,6 +23,8 @@ #include +#include + using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -33,14 +35,14 @@ enum communication_type DATASHARING }; -class PubSubBasic : public testing::TestWithParam +class PubSubBasic : public testing::TestWithParam> { public: void SetUp() override { LibrarySettingsAttributes library_settings; - switch (GetParam()) + switch (std::get<0>(GetParam())) { case INTRAPROCESS: library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_FULL; @@ -53,12 +55,14 @@ class PubSubBasic : public testing::TestWithParam default: break; } + + use_pull_mode = std::get<1>(GetParam()); } void TearDown() override { LibrarySettingsAttributes library_settings; - switch (GetParam()) + switch (std::get<0>(GetParam())) { case INTRAPROCESS: library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF; @@ -71,12 +75,20 @@ class PubSubBasic : public testing::TestWithParam default: break; } + + use_pull_mode = false; } }; TEST_P(PubSubBasic, PubSubAsNonReliableHelloworld) { + // Best effort incompatible with best effort + if (use_pull_mode) + { + return; + } + PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter writer(TEST_TOPIC_NAME); @@ -662,20 +674,22 @@ TEST_P(PubSubBasic, unique_flows_one_writer_two_readers) GTEST_INSTANTIATE_TEST_MACRO(PubSubBasic, PubSubBasic, - testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING), + testing::Combine(testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING), testing::Values(false, true)), [](const testing::TestParamInfo& info) { - switch (info.param) + bool pull_mode = std::get<1>(info.param); + std::string suffix = pull_mode ? "_pull_mode" : ""; + switch (std::get<0>(info.param)) { case INTRAPROCESS: - return "Intraprocess"; + return "Intraprocess" + suffix; break; case DATASHARING: - return "Datasharing"; + return "Datasharing" + suffix; break; case TRANSPORT: default: - return "Transport"; + return "Transport" + suffix; } }); diff --git a/test/unittest/dds/publisher/DataWriterTests.cpp b/test/unittest/dds/publisher/DataWriterTests.cpp index 47244884ea3..009b8c3d43f 100644 --- a/test/unittest/dds/publisher/DataWriterTests.cpp +++ b/test/unittest/dds/publisher/DataWriterTests.cpp @@ -334,8 +334,7 @@ TEST(DataWriterTests, InvalidQos) { DomainParticipantQos pqos = PARTICIPANT_QOS_DEFAULT; pqos.entity_factory().autoenable_created_entities = false; - DomainParticipant* participant = - DomainParticipantFactory::get_instance()->create_participant(0, pqos); + DomainParticipant* participant = DomainParticipantFactory::get_instance()->create_participant(0, pqos); ASSERT_NE(participant, nullptr); Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); @@ -350,46 +349,61 @@ TEST(DataWriterTests, InvalidQos) DataWriter* datawriter = publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT); ASSERT_NE(datawriter, nullptr); + /* Unsupported QoS */ + const ReturnCode_t unsupported_code = ReturnCode_t::RETCODE_UNSUPPORTED; + DataWriterQos qos; qos = DATAWRITER_QOS_DEFAULT; qos.durability().kind = PERSISTENT_DURABILITY_QOS; - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_UNSUPPORTED); + EXPECT_EQ(unsupported_code, datawriter->set_qos(qos)); qos = DATAWRITER_QOS_DEFAULT; qos.destination_order().kind = BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS; - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_UNSUPPORTED); + EXPECT_EQ(unsupported_code, datawriter->set_qos(qos)); qos = DATAWRITER_QOS_DEFAULT; qos.properties().properties().emplace_back("fastdds.unique_network_flows", ""); - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_UNSUPPORTED); + EXPECT_EQ(unsupported_code, datawriter->set_qos(qos)); + + /* Inconsistent QoS */ + const ReturnCode_t inconsistent_code = ReturnCode_t::RETCODE_INCONSISTENT_POLICY; + + qos = DATAWRITER_QOS_DEFAULT; + qos.properties().properties().emplace_back("fastdds.push_mode", "false"); + qos.reliability().kind = BEST_EFFORT_RELIABILITY_QOS; + EXPECT_EQ(inconsistent_code, datawriter->set_qos(qos)); + + qos.reliability().kind = RELIABLE_RELIABILITY_QOS; + qos.reliable_writer_qos().times.heartbeatPeriod = eprosima::fastrtps::c_TimeInfinite; + EXPECT_EQ(inconsistent_code, datawriter->set_qos(qos)); qos = DATAWRITER_QOS_DEFAULT; qos.reliability().kind = BEST_EFFORT_RELIABILITY_QOS; qos.ownership().kind = EXCLUSIVE_OWNERSHIP_QOS; - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_INCONSISTENT_POLICY); + EXPECT_EQ(inconsistent_code, datawriter->set_qos(qos)); qos = DATAWRITER_QOS_DEFAULT; qos.liveliness().kind = AUTOMATIC_LIVELINESS_QOS; qos.liveliness().announcement_period = 20; qos.liveliness().lease_duration = 10; - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_INCONSISTENT_POLICY); + EXPECT_EQ(inconsistent_code, datawriter->set_qos(qos)); qos.liveliness().kind = MANUAL_BY_PARTICIPANT_LIVELINESS_QOS; - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_INCONSISTENT_POLICY); + EXPECT_EQ(inconsistent_code, datawriter->set_qos(qos)); qos = DATAWRITER_QOS_DEFAULT; qos.data_sharing().on("/tmp"); qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::DYNAMIC_RESERVE_MEMORY_MODE; - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_INCONSISTENT_POLICY); + EXPECT_EQ(inconsistent_code, datawriter->set_qos(qos)); qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::DYNAMIC_REUSABLE_MEMORY_MODE; - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_INCONSISTENT_POLICY); + EXPECT_EQ(inconsistent_code, datawriter->set_qos(qos)); qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_MEMORY_MODE; - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_OK); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, datawriter->set_qos(qos)); qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - ASSERT_TRUE(datawriter->set_qos(qos) == ReturnCode_t::RETCODE_OK); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, datawriter->set_qos(qos)); ASSERT_TRUE(publisher->delete_datawriter(datawriter) == ReturnCode_t::RETCODE_OK); ASSERT_TRUE(participant->delete_topic(topic) == ReturnCode_t::RETCODE_OK);