From 3e270a26d9ca1b5142a468a8d3dc9b83818135db Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sun, 19 May 2024 20:40:33 +0200 Subject: [PATCH] New `max_message_size` property to limit output datagrams size (#4777) (#4807) * Refs #20489. Parse property in RTPS writer. Signed-off-by: Miguel Company * Refs #20849. Parse property in RTPS participant. Signed-off-by: Miguel Company * Refs #20849: Add test for RTPS writer Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Add test for participant Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Apply suggestions Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Uncrustify Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Apply suggestions Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Add tests in DDS layer Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> * Refs #20849: Apply suggestions Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> --------- Signed-off-by: Miguel Company Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com> Co-authored-by: Miguel Company (cherry picked from commit 6d20b649bc25a13ed38665e5eff8b2ed9f2b02ce) Co-authored-by: elianalf <62831776+elianalf@users.noreply.github.com> (cherry picked from commit 0571391b594ef047b80e58f732da7e6531dbcd24) # Conflicts: # src/cpp/rtps/participant/RTPSParticipantImpl.cpp # test/blackbox/common/DDSBlackboxTestsBasic.cpp --- include/fastdds/rtps/writer/RTPSWriter.h | 3 + .../rtps/participant/RTPSParticipantImpl.cpp | 26 +- .../rtps/participant/RTPSParticipantImpl.h | 3 + src/cpp/rtps/writer/RTPSWriter.cpp | 40 +- .../blackbox/common/DDSBlackboxTestsBasic.cpp | 574 ++++++++++++++++++ .../common/RTPSBlackboxTestsBasic.cpp | 95 +++ 6 files changed, 728 insertions(+), 13 deletions(-) diff --git a/include/fastdds/rtps/writer/RTPSWriter.h b/include/fastdds/rtps/writer/RTPSWriter.h index fc4d1395504..38753d45ba2 100644 --- a/include/fastdds/rtps/writer/RTPSWriter.h +++ b/include/fastdds/rtps/writer/RTPSWriter.h @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -472,6 +473,8 @@ class RTPSWriter //! Flow controller. fastdds::rtps::FlowController* flow_controller_; + //! Maximum number of bytes allowed for an RTPS datagram generated by this writer. + uint32_t max_output_message_size_ = std::numeric_limits::max(); //!WriterHistory WriterHistory* mp_history = nullptr; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 21dba7ca80c..b04df2dfbb1 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -359,6 +359,7 @@ RTPSParticipantImpl::RTPSParticipantImpl( }); } +<<<<<<< HEAD // Creation of user locator and receiver resources //If no default locators are defined we define some. /* The reasoning here is the following. @@ -423,6 +424,24 @@ RTPSParticipantImpl::RTPSParticipantImpl( logWarning(RTPS_PARTICIPANT, "Metatraffic multicast port " << meta_multicast_port_for_check << " cannot be opened." " It may is opened by another application. Discovery may fail."); +======= +void RTPSParticipantImpl::setup_output_traffic() +{ + { + const std::string* max_size_property = + PropertyPolicyHelper::find_property(m_att.properties, "fastdds.max_message_size"); + if (max_size_property != nullptr) + { + try + { + max_output_message_size_ = std::stoul(*max_size_property); + } + catch (const std::exception& e) + { + EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what()); + } + } +>>>>>>> 0571391b5 (New `max_message_size` property to limit output datagrams size (#4777) (#4807)) } bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic; @@ -2109,8 +2128,11 @@ uint32_t RTPSParticipantImpl::getMaxMessageSize() const #endif // if HAVE_SECURITY return (std::min)( - m_network_Factory.get_max_message_size_between_transports(), - max_receiver_buffer_size); + { + max_output_message_size_, + m_network_Factory.get_max_message_size_between_transports(), + max_receiver_buffer_size + }); } uint32_t RTPSParticipantImpl::getMaxDataSize() diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 5a06b0a7139..eb765390ad0 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -574,6 +575,8 @@ class RTPSParticipantImpl std::function type_check_fn_; //!Pool of send buffers std::unique_ptr send_buffers_; + //! Maximum number of bytes allowed for an RTPS datagram generated by this writer. + uint32_t max_output_message_size_ = std::numeric_limits::max(); /** * Client override flag: SIMPLE participant that has been overriden with the environment variable and transformed diff --git a/src/cpp/rtps/writer/RTPSWriter.cpp b/src/cpp/rtps/writer/RTPSWriter.cpp index 9c5f188efb0..1f36fbc6b9d 100644 --- a/src/cpp/rtps/writer/RTPSWriter.cpp +++ b/src/cpp/rtps/writer/RTPSWriter.cpp @@ -17,24 +17,22 @@ * */ -#include - -#include -#include - -#include -#include +#include -#include +#include #include -#include - +#include #include - #include +#include +#include +#include +#include +#include + #include #include @@ -109,6 +107,22 @@ void RTPSWriter::init( const std::shared_ptr& change_pool, const WriterAttributes& att) { + { + const std::string* max_size_property = + PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.max_message_size"); + if (max_size_property != nullptr) + { + try + { + max_output_message_size_ = std::stoul(*max_size_property); + } + catch (const std::exception& e) + { + EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what()); + } + } + } + payload_pool_ = payload_pool; change_pool_ = change_pool; fixed_payload_size_ = 0; @@ -306,6 +320,10 @@ uint32_t RTPSWriter::getMaxDataSize() uint32_t flow_max = flow_controller_->get_max_payload(); uint32_t part_max = mp_RTPSParticipant->getMaxMessageSize(); uint32_t max_size = flow_max > part_max ? part_max : flow_max; + if (max_output_message_size_ < max_size) + { + max_size = max_output_message_size_; + } max_size = calculateMaxDataSize(max_size); return max_size &= ~3; diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index f9e6110a383..afc97e7c09d 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -271,6 +271,580 @@ TEST(DDSBasic, MultithreadedReaderCreationDoesNotDeadlock) ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); } +<<<<<<< HEAD +======= +/** + * Read a parameterList from a CDRMessage. + * Search for PID_CUSTOM_RELATED_SAMPLE_IDENTITY and PID_RELATED_SAMPLE_IDENTITY. + * Overwrite PID_CUSTOM_RELATED_SAMPLE_IDENTITY to just leave the new one in msg. + * @param[in] msg Reference to the message. + * @param[out] exists_pid_related_sample_identity True if the parameter is inside msg. + * @param[out] exists_pid_custom_related_sample_identity True if the parameter is inside msg. + * @return true if parsing was correct, false otherwise. + */ +bool check_related_sample_identity_field( + fastrtps::rtps::CDRMessage_t& msg, + bool& exists_pid_related_sample_identity, + bool& exists_pid_custom_related_sample_identity) +{ + uint32_t qos_size = 0; + + auto parameter_process = [&]( + fastrtps::rtps::CDRMessage_t* msg, + ParameterId_t& pid, + uint16_t plength, + uint32_t& pid_pos) + { + switch (pid) + { + case PID_CUSTOM_RELATED_SAMPLE_IDENTITY: + { + if (plength >= 24) + { + ParameterSampleIdentity_t p(pid, plength); + if (!fastdds::dds::ParameterSerializer::read_from_cdr_message(p, + msg, plength)) + { + return false; + } + exists_pid_custom_related_sample_identity = true; + // Invalid assignment to overwrite parameter, in order to just send the standard PID_RELATED_SAMPLE_IDENTITY + msg->buffer[pid_pos] = 0xff; + msg->buffer[pid_pos + 1] = 0xff; + } + break; + } + case PID_RELATED_SAMPLE_IDENTITY: + { + if (plength >= 24) + { + ParameterSampleIdentity_t p(pid, plength); + if (!fastdds::dds::ParameterSerializer::read_from_cdr_message(p, + msg, plength)) + { + return false; + } + exists_pid_related_sample_identity = true; + } + break; + } + + default: + break; + } + return true; + }; + + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + ParameterId_t pid{PID_SENTINEL}; + uint16_t plength = 0; + bool valid = true; + auto msg_pid_pos = msg.pos; + valid &= fastrtps::rtps::CDRMessage::readUInt16(&msg, (uint16_t*)&pid); + valid &= fastrtps::rtps::CDRMessage::readUInt16(&msg, &plength); + + if (pid == PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + is_sentinel = true; + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (!parameter_process(&msg, pid, plength, msg_pid_pos)) + { + return false; + } + } + } + return true; +} + +/** + * This test checks that PID_RELATED_SAMPLE_IDENTITY and + * PID_CUSTOM_RELATED_SAMPLE_IDENTITY are being sent as parameter, + * and that the new PID_RELATED_SAMPLE_IDENTITY is being properly + * interpreted. + * Inside the transport filter, both PIDs are indentified, and the old PID is overwritten. + * Reader only receives the new PID, and identifies the related sample identity. + */ +TEST(DDSBasic, PidRelatedSampleIdentity) +{ + PubSubWriter reliable_writer(TEST_TOPIC_NAME); + PubSubReader reliable_reader(TEST_TOPIC_NAME); + + // Test transport will be used in order to filter inlineQoS + auto test_transport = std::make_shared(); + bool exists_pid_related_sample_identity = false; + bool exists_pid_custom_related_sample_identity = false; + + test_transport->drop_data_messages_filter_ = + [&exists_pid_related_sample_identity, &exists_pid_custom_related_sample_identity] + (eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool + { + // Inside this filter, the two flags passed in register whether both PIDs are included in the msg to be sent. + // The legacy value is overwritten in order to send a msg with only the standard PID_RELATED_SAMPLE_IDENTITY as valid parameter, + // so that the reader will only parse that one. + bool ret = check_related_sample_identity_field(msg, exists_pid_related_sample_identity, + exists_pid_custom_related_sample_identity); + EXPECT_TRUE(ret); + return false; + }; + + reliable_writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) + .init(); + ASSERT_TRUE(reliable_writer.isInitialized()); + + reliable_reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) + .init(); + ASSERT_TRUE(reliable_reader.isInitialized()); + + reliable_writer.wait_discovery(); + reliable_reader.wait_discovery(); + + DataWriter& native_writer = reliable_writer.get_native_writer(); + + HelloWorld data; + // Send reply associating it with the client request. + eprosima::fastrtps::rtps::WriteParams write_params; + eprosima::fastrtps::rtps::SampleIdentity related_sample_identity_; + eprosima::fastrtps::rtps::GUID_t unknown_guid; + related_sample_identity_.writer_guid(unknown_guid); + eprosima::fastrtps::rtps::SequenceNumber_t seq(51, 24); + related_sample_identity_.sequence_number(seq); + write_params.related_sample_identity() = related_sample_identity_; + + // Publish the new value, deduce the instance handle + bool write_ret = native_writer.write((void*)&data, write_params); + ASSERT_EQ(true, write_ret); + + DataReader& native_reader = reliable_reader.get_native_reader(); + + HelloWorld read_data; + eprosima::fastdds::dds::SampleInfo info; + eprosima::fastrtps::Duration_t timeout; + timeout.seconds = 2; + while (!native_reader.wait_for_unread_message(timeout)) + { + } + + ASSERT_EQ(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, + native_reader.take_next_sample((void*)&read_data, &info)); + + ASSERT_TRUE(exists_pid_related_sample_identity); + ASSERT_TRUE(exists_pid_custom_related_sample_identity); + + ASSERT_EQ(related_sample_identity_, info.related_sample_identity); +} + +/** + * This test checks that PID_RELATED_SAMPLE_IDENTITY and + * PID_CUSTOM_RELATED_SAMPLE_IDENTITY are being sent as parameter, + * and that the new PID_RELATED_SAMPLE_IDENTITY is being properly + * interpreted. + * Inside the transport filter, both PIDs are indentified, and the old PID is overwritten. + * Reader only receives the new PID, and identifies the related sample identity. + */ +TEST(DDSBasic, IgnoreParticipant) +{ + + class IgnoringDomainParticipantListener : public DomainParticipantListener + { + public: + + std::atomic_int num_matched{0}; + std::atomic_int num_ignored{0}; + + void on_participant_discovery( + DomainParticipant* /*participant*/, + eprosima::fastrtps::rtps::ParticipantDiscoveryInfo&& info, + bool& should_be_ignored) override + { + std::cout << "Using custom listener" << std::endl; + if (info.status == info.DISCOVERED_PARTICIPANT) + { + std::cout << "Discovered participant" << std::endl; + if (info.info.m_userData == std::vector({ 'i', 'g', 'n' })) + { + std::cout << "Ignoring participant" << std::endl; + should_be_ignored = true; + num_ignored++; + } + else + { + std::cout << "Accepting participant" << std::endl; + num_matched++; + } + } + } + + private: + + using DomainParticipantListener::on_participant_discovery; + }; + // Set DomainParticipantFactory to create disabled entities + DomainParticipantFactoryQos factory_qos; + DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); + + DomainParticipantQos ignored_participant_qos; + DomainParticipantQos valid_participant_qos; + DomainParticipantQos other_participant_qos; + + const char* prefix = "00.00.00.00.00.00.FF.FF.FF.FF.FF.FF"; + + std::istringstream is(prefix); + + is >> ignored_participant_qos.wire_protocol().prefix; + + ignored_participant_qos.user_data().data_vec({ 'i', 'g', 'n' }); + valid_participant_qos.user_data().data_vec({ 'o', 'k' }); + + IgnoringDomainParticipantListener ignListener; + DomainParticipant* participant_listener = factory->create_participant( + (uint32_t)GET_PID() % 230, other_participant_qos, &ignListener); + std::this_thread::sleep_for(std::chrono::seconds(2)); + DomainParticipant* participant_ign = + factory->create_participant((uint32_t)GET_PID() % 230, ignored_participant_qos); + std::this_thread::sleep_for(std::chrono::seconds(2)); + DomainParticipant* participant_valid = + factory->create_participant((uint32_t)GET_PID() % 230, valid_participant_qos); + std::this_thread::sleep_for(std::chrono::seconds(2)); + + factory->delete_participant(participant_ign); + + ignored_participant_qos.user_data().data_vec({ 'o', 'k' }); + std::this_thread::sleep_for(std::chrono::seconds(2)); + participant_ign = factory->create_participant((uint32_t)GET_PID() % 230, ignored_participant_qos); + std::this_thread::sleep_for(std::chrono::seconds(2)); + ASSERT_EQ (ignListener.num_matched.load(), 1); + ASSERT_EQ (ignListener.num_ignored.load(), 1); + + factory->delete_participant(participant_valid); + factory->delete_participant(participant_listener); + factory->delete_participant(participant_ign); + +} + +/** + * @test This test checks the ignore local endpoints feature on the RTPS layer when writer and + * reader are under the same participant. Corresponds with tests: + * * PART-IGNORE-LOCAL-TEST:01 + * * PART-IGNORE-LOCAL-TEST:02 + * * PART-IGNORE-LOCAL-TEST:03 + */ +TEST(DDSBasic, participant_ignore_local_endpoints) +{ + class CustomLogConsumer : public LogConsumer + { + public: + + void Consume( + const Log::Entry&) override + { + logs_consumed_++; + cv_.notify_all(); + } + + size_t wait_for_entries( + uint32_t amount, + const std::chrono::duration& max_wait) + { + std::unique_lock lock(mtx_); + cv_.wait_for(lock, max_wait, [this, amount]() -> bool + { + return logs_consumed_ > 0 && logs_consumed_.load() == amount; + }); + return logs_consumed_.load(); + } + + protected: + + std::atomic logs_consumed_{0u}; + std::mutex mtx_; + std::condition_variable cv_; + }; + + struct Config + { + std::string test_id; + std::string property_value; + uint8_t log_errors; + uint8_t expected_matched_endpoints; + uint8_t sent_samples; + uint8_t expected_received_samples; + }; + + std::vector tests_configs = { + {"PART-IGNORE-LOCAL-TEST:01", "true", 0, 0, 5, 0}, + {"PART-IGNORE-LOCAL-TEST:02", "false", 0, 1, 5, 5}, + {"PART-IGNORE-LOCAL-TEST:03", "asdfg", 1, 1, 5, 5} + }; + + for (Config test_config : tests_configs) + { + std::cout << std::endl; + std::cout << "---------------------------------------" << std::endl; + std::cout << "Running test: " << test_config.test_id << std::endl; + std::cout << "---------------------------------------" << std::endl; + + /* Set up */ + Log::Reset(); + Log::SetVerbosity(Log::Error); + CustomLogConsumer* log_consumer = new CustomLogConsumer(); + std::unique_ptr log_consumer_unique_ptr(log_consumer); + Log::RegisterConsumer(std::move(log_consumer_unique_ptr)); + + // Create the DomainParticipant with the appropriate value for the property + PubSubWriterReader writer_reader(TEST_TOPIC_NAME); + eprosima::fastrtps::rtps::PropertyPolicy property_policy; + property_policy.properties().emplace_back("fastdds.ignore_local_endpoints", test_config.property_value); + writer_reader.property_policy(property_policy); + + /* Procedure */ + // Create the DataWriter & DataReader + writer_reader.init(); + EXPECT_TRUE(writer_reader.isInitialized()); + + // Wait for discovery + writer_reader.wait_discovery(test_config.expected_matched_endpoints, std::chrono::seconds(1)); + EXPECT_EQ(writer_reader.get_publication_matched(), test_config.expected_matched_endpoints); + EXPECT_EQ(writer_reader.get_subscription_matched(), test_config.expected_matched_endpoints); + + // Send samples + auto samples = default_helloworld_data_generator(test_config.sent_samples); + writer_reader.startReception(samples); + writer_reader.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + EXPECT_EQ(writer_reader.block_for_all(std::chrono::seconds(1)), test_config.expected_received_samples); + + // Wait for log entries + EXPECT_EQ(log_consumer->wait_for_entries(test_config.log_errors, std::chrono::seconds( + 1)), test_config.log_errors); + + /* Tear-down */ + Log::Reset(); + } +} + +/** + * @test This test checks the ignore local endpoints feature on the RTPS layer when writer and + * reader are under the different participant. Corresponds with test: + * * PART-IGNORE-LOCAL-TEST:07 + */ +TEST(DDSBasic, participant_ignore_local_endpoints_two_participants) +{ + std::cout << std::endl; + std::cout << "---------------------------------------" << std::endl; + std::cout << "Running test: PART-IGNORE-LOCAL-TEST:07" << std::endl; + std::cout << "---------------------------------------" << std::endl; + + /* Set up */ + + // Create the DomainParticipants with the appropriate value for the property + eprosima::fastrtps::rtps::PropertyPolicy property_policy; + property_policy.properties().emplace_back("fastdds.ignore_local_endpoints", "true"); + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + writer.property_policy(property_policy); + reader.property_policy(property_policy); + + /* Procedure */ + // Create the DataWriter & DataReader + writer.init(); + EXPECT_TRUE(writer.isInitialized()); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(std::chrono::seconds(1)); + reader.wait_discovery(std::chrono::seconds(1)); + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_helloworld_data_generator(5); + reader.startReception(samples); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + EXPECT_EQ(reader.block_for_all(std::chrono::seconds(1)), 5); +} + +/** + * @test This test checks both the visibility of custom pool functions + * for DataReader and DataWriters while also testing their correct + * behavior + */ +TEST(DDSBasic, endpoint_custom_payload_pools) +{ + DomainParticipant* participant = + DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT); + ASSERT_NE(participant, nullptr); + + Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); + ASSERT_NE(subscriber, nullptr); + + Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); + ASSERT_NE(publisher, nullptr); + + // Register type + TypeSupport type; + + type.reset(new StringTestPubSubType()); + type.register_type(participant); + ASSERT_NE(nullptr, type); + + type.register_type(participant); + + Topic* topic = participant->create_topic("footopic", type.get_type_name(), TOPIC_QOS_DEFAULT); + ASSERT_NE(topic, nullptr); + + // Next QoS config checks the default qos configuration, + // create_datareader() should not return nullptr. + DataReaderQos reader_qos = DATAREADER_QOS_DEFAULT; + + std::shared_ptr reader_payload_pool = std::make_shared(); + + std::shared_ptr writer_payload_pool = std::make_shared(); + + DataReader* data_reader = subscriber->create_datareader_with_payload_pool( + topic, reader_qos, reader_payload_pool, nullptr, StatusMask::all()); + + DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; + + DataWriter* data_writer = publisher->create_datawriter_with_payload_pool( + topic, writer_qos, writer_payload_pool, nullptr, StatusMask::all()); + + ASSERT_NE(data_reader, nullptr); + ASSERT_NE(data_writer, nullptr); + + StringTest data; + data.message("Lorem Ipsum"); + + data_writer->write(&data, HANDLE_NIL); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + + ASSERT_EQ(reader_payload_pool->requested_payload_count, 1u); + ASSERT_EQ(writer_payload_pool->requested_payload_count, 1u); + + participant->delete_contained_entities(); +} + +/** + * @test Set the maximum number of bytes allowed for a datagram generated by a DomainParticipant. + */ +TEST(DDSBasic, max_output_message_size_participant) +{ + PubSubReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + auto testTransport = std::make_shared(); + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + + // Create the DomainParticipants with the appropriate value for the property + eprosima::fastrtps::rtps::PropertyPolicy property_policy; + property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str); + PubSubWriter writer(TEST_TOPIC_NAME); + writer.property_policy(property_policy).disable_builtin_transport() + .add_user_transport_to_pparams(testTransport).init(); + EXPECT_TRUE(writer.isInitialized()); + + // Wait for discovery + writer.wait_discovery(std::chrono::seconds(2)); + reader.wait_discovery(std::chrono::seconds(2)); + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.startReception(samples); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); +} + +/** + * @test Set the maximum number of bytes allowed for a datagram generated by a DataWriter. + */ +TEST(DDSBasic, max_output_message_size_writer) +{ + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + + auto testTransport = std::make_shared(); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + + // Create the DataWriter with the appropriate value for the property + eprosima::fastrtps::rtps::PropertyPolicy property_policy; + property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str); + PubSubWriter writer(TEST_TOPIC_NAME); + writer.entity_property_policy(property_policy).disable_builtin_transport() + .add_user_transport_to_pparams(testTransport).init(); + ASSERT_TRUE(writer.isInitialized()); + + PubSubReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(std::chrono::seconds(2)); + reader.wait_discovery(std::chrono::seconds(2)); + + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.startReception(samples); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); + +} + +>>>>>>> 0571391b5 (New `max_message_size` property to limit output datagrams size (#4777) (#4807)) } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp index 7dd05a71e41..42d93a61fb6 100644 --- a/test/blackbox/common/RTPSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/RTPSBlackboxTestsBasic.cpp @@ -893,6 +893,101 @@ TEST(RTPS, RTPSCorrectGAPProcessing) } +/* Maximum number of bytes allowed for an RTPS datagram generated by this participant. */ +TEST(RTPS, max_output_message_size_participant) +{ + /* Set up */ + // Create the RTPSReader + RTPSWithRegistrationReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + // Create the RTPSParticipants with the appropriate value for the property + auto testTransport = std::make_shared(); + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + + eprosima::fastrtps::rtps::RTPSParticipantAttributes patt; + patt.useBuiltinTransports = false; + patt.userTransports.push_back(testTransport); + patt.properties.properties().emplace_back("fastdds.max_message_size", segment_size_str); + eprosima::fastrtps::rtps::RTPSParticipant* participant_writer = + eprosima::fastrtps::rtps::RTPSDomain::createParticipant(static_cast(GET_PID()) % 230, patt); + ASSERT_NE(participant_writer, nullptr); + + // Create the RTPSWriter + RTPSWithRegistrationWriter writer(TEST_TOPIC_NAME, participant_writer); + writer.init(); + EXPECT_TRUE(writer.isInitialized()); + + // Wait for discovery + writer.wait_discovery(1, std::chrono::seconds(2)); + reader.wait_discovery(1, std::chrono::seconds(2)); + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.expected_data(samples); + reader.startReception(); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); + + /* Tear-down */ + eprosima::fastrtps::rtps::RTPSDomain::removeRTPSParticipant(participant_writer); +} + +/* Maximum number of bytes allowed for an RTPS datagram generated by this writer. */ +TEST(RTPS, max_output_message_size_writer) +{ + const uint32_t segment_size = 1470; + std::string segment_size_str = std::to_string(segment_size); + + auto testTransport = std::make_shared(); + testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram) + { + EXPECT_LE(datagram.length, segment_size); + // Never drop samples + return false; + }; + RTPSWithRegistrationWriter writer(TEST_TOPIC_NAME); + writer.add_property("fastdds.max_message_size", segment_size_str). + disable_builtin_transport().add_user_transport_to_pparams(testTransport).init(); + ASSERT_TRUE(writer.isInitialized()); + + RTPSWithRegistrationReader reader(TEST_TOPIC_NAME); + reader.init(); + EXPECT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + EXPECT_EQ(writer.get_matched(), 1u); + EXPECT_EQ(reader.get_matched(), 1u); + + // Send samples + auto samples = default_data16kb_data_generator(1); + reader.expected_data(samples); + reader.startReception(); + writer.send(samples); + EXPECT_TRUE(samples.empty()); + + // Wait for reception + reader.block_for_all(std::chrono::seconds(1)); + EXPECT_EQ(reader.getReceivedCount(), 1u); + +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else