Skip to content

Commit

Permalink
New max_message_size property to limit output datagrams size (#4777)
Browse files Browse the repository at this point in the history
* Refs #20489. Parse property in RTPS writer.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20849. Parse property in RTPS participant.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20849: Add test for RTPS writer

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Add test for participant

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Apply suggestions

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Uncrustify

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Apply suggestions

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Add tests in DDS layer

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

* Refs #20849: Apply suggestions

Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: elianalf <62831776+elianalf@users.noreply.github.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 6d20b64)
  • Loading branch information
elianalf committed May 20, 2024
1 parent ca40f53 commit 8abdb54
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 13 deletions.
3 changes: 3 additions & 0 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <chrono>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <vector>
Expand Down Expand Up @@ -527,6 +528,8 @@ class RTPSWriter

//! Flow controller.
fastdds::rtps::FlowController* flow_controller_;
//! Maximum number of bytes allowed for an RTPS datagram generated by this writer.
uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();

//!WriterHistory
WriterHistory* mp_history = nullptr;
Expand Down
21 changes: 19 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,20 @@ RTPSParticipantImpl::RTPSParticipantImpl(
" It may is opened by another application. Discovery may fail.");
}

const std::string* max_size_property =
PropertyPolicyHelper::find_property(m_att.properties, "fastdds.max_message_size");
if (max_size_property != nullptr)
{
try
{
max_output_message_size_ = std::stoul(*max_size_property);
}
catch (const std::exception& e)
{
EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what());
}
}

bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic;
size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number;
if (num_send_buffers == 0)
Expand Down Expand Up @@ -2191,8 +2205,11 @@ uint32_t RTPSParticipantImpl::getMaxMessageSize() const
#endif // if HAVE_SECURITY

return (std::min)(
m_network_Factory.get_max_message_size_between_transports(),
max_receiver_buffer_size);
{
max_output_message_size_,
m_network_Factory.get_max_message_size_between_transports(),
max_receiver_buffer_size
});
}

uint32_t RTPSParticipantImpl::getMaxDataSize()
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <limits>
#include <list>
#include <mutex>
#include <set>
Expand Down Expand Up @@ -594,6 +595,8 @@ class RTPSParticipantImpl
std::function<bool(const std::string&)> type_check_fn_;
//!Pool of send buffers
std::unique_ptr<SendBuffersManager> send_buffers_;
//! Maximum number of bytes allowed for an RTPS datagram generated by this writer.
uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();

/**
* Client override flag: SIMPLE participant that has been overriden with the environment variable and transformed
Expand Down
40 changes: 29 additions & 11 deletions src/cpp/rtps/writer/RTPSWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@
*
*/

#include <mutex>

#include <rtps/history/BasicPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>

#include <rtps/DataSharing/DataSharingNotifier.hpp>
#include <rtps/DataSharing/WriterPool.hpp>
#include <fastdds/rtps/writer/RTPSWriter.h>

#include <rtps/participant/RTPSParticipantImpl.h>
#include <mutex>

#include <fastdds/dds/log/Log.hpp>

#include <fastdds/rtps/writer/RTPSWriter.h>

#include <fastdds/rtps/attributes/PropertyPolicy.h>
#include <fastdds/rtps/history/WriterHistory.h>

#include <fastdds/rtps/messages/RTPSMessageCreator.h>

#include <rtps/DataSharing/DataSharingNotifier.hpp>
#include <rtps/DataSharing/WriterPool.hpp>
#include <rtps/history/BasicPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>
#include <rtps/participant/RTPSParticipantImpl.h>

#include <statistics/rtps/StatisticsBase.hpp>
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>

Expand Down Expand Up @@ -109,6 +107,22 @@ void RTPSWriter::init(
const std::shared_ptr<IChangePool>& change_pool,
const WriterAttributes& att)
{
{
const std::string* max_size_property =
PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.max_message_size");
if (max_size_property != nullptr)
{
try
{
max_output_message_size_ = std::stoul(*max_size_property);
}
catch (const std::exception& e)
{
EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error parsing max_message_size property: " << e.what());
}
}
}

payload_pool_ = payload_pool;
change_pool_ = change_pool;
fixed_payload_size_ = 0;
Expand Down Expand Up @@ -306,6 +320,10 @@ uint32_t RTPSWriter::getMaxDataSize()
uint32_t flow_max = flow_controller_->get_max_payload();
uint32_t part_max = mp_RTPSParticipant->getMaxMessageSize();
uint32_t max_size = flow_max > part_max ? part_max : flow_max;
if (max_output_message_size_ < max_size)
{
max_size = max_output_message_size_;
}

max_size = calculateMaxDataSize(max_size);
return max_size &= ~3;
Expand Down
91 changes: 91 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,97 @@ TEST(DDSBasic, endpoint_custom_payload_pools)
participant->delete_contained_entities();
}

/**
* @test Set the maximum number of bytes allowed for a datagram generated by a DomainParticipant.
*/
TEST(DDSBasic, max_output_message_size_participant)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
reader.init();
EXPECT_TRUE(reader.isInitialized());

auto testTransport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();
const uint32_t segment_size = 1470;
std::string segment_size_str = std::to_string(segment_size);
testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram)
{
EXPECT_LE(datagram.length, segment_size);
// Never drop samples
return false;
};

// Create the DomainParticipants with the appropriate value for the property
eprosima::fastrtps::rtps::PropertyPolicy property_policy;
property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
writer.property_policy(property_policy).disable_builtin_transport()
.add_user_transport_to_pparams(testTransport).init();
EXPECT_TRUE(writer.isInitialized());

// Wait for discovery
writer.wait_discovery(std::chrono::seconds(2));
reader.wait_discovery(std::chrono::seconds(2));
EXPECT_EQ(writer.get_matched(), 1u);
EXPECT_EQ(reader.get_matched(), 1u);

// Send samples
auto samples = default_data16kb_data_generator(1);
reader.startReception(samples);
writer.send(samples);
EXPECT_TRUE(samples.empty());

// Wait for reception
reader.block_for_all(std::chrono::seconds(1));
EXPECT_EQ(reader.getReceivedCount(), 1u);
}

/**
* @test Set the maximum number of bytes allowed for a datagram generated by a DataWriter.
*/
TEST(DDSBasic, max_output_message_size_writer)
{
const uint32_t segment_size = 1470;
std::string segment_size_str = std::to_string(segment_size);

auto testTransport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();
testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram)
{
EXPECT_LE(datagram.length, segment_size);
// Never drop samples
return false;
};

// Create the DataWriter with the appropriate value for the property
eprosima::fastrtps::rtps::PropertyPolicy property_policy;
property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
writer.entity_property_policy(property_policy).disable_builtin_transport()
.add_user_transport_to_pparams(testTransport).init();
ASSERT_TRUE(writer.isInitialized());

PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
reader.init();
EXPECT_TRUE(reader.isInitialized());

// Wait for discovery
writer.wait_discovery(std::chrono::seconds(2));
reader.wait_discovery(std::chrono::seconds(2));

EXPECT_EQ(writer.get_matched(), 1u);
EXPECT_EQ(reader.get_matched(), 1u);

// Send samples
auto samples = default_data16kb_data_generator(1);
reader.startReception(samples);
writer.send(samples);
EXPECT_TRUE(samples.empty());

// Wait for reception
reader.block_for_all(std::chrono::seconds(1));
EXPECT_EQ(reader.getReceivedCount(), 1u);

}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
95 changes: 95 additions & 0 deletions test/blackbox/common/RTPSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,101 @@ TEST(RTPS, participant_ignore_local_endpoints_two_participants)
eprosima::fastrtps::rtps::RTPSDomain::removeRTPSParticipant(participant_reader);
}

/* Maximum number of bytes allowed for an RTPS datagram generated by this participant. */
TEST(RTPS, max_output_message_size_participant)
{
/* Set up */
// Create the RTPSReader
RTPSWithRegistrationReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
reader.init();
EXPECT_TRUE(reader.isInitialized());

// Create the RTPSParticipants with the appropriate value for the property
auto testTransport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();
const uint32_t segment_size = 1470;
std::string segment_size_str = std::to_string(segment_size);
testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram)
{
EXPECT_LE(datagram.length, segment_size);
// Never drop samples
return false;
};

eprosima::fastrtps::rtps::RTPSParticipantAttributes patt;
patt.useBuiltinTransports = false;
patt.userTransports.push_back(testTransport);
patt.properties.properties().emplace_back("fastdds.max_message_size", segment_size_str);
eprosima::fastrtps::rtps::RTPSParticipant* participant_writer =
eprosima::fastrtps::rtps::RTPSDomain::createParticipant(static_cast<uint32_t>(GET_PID()) % 230, patt);
ASSERT_NE(participant_writer, nullptr);

// Create the RTPSWriter
RTPSWithRegistrationWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME, participant_writer);
writer.init();
EXPECT_TRUE(writer.isInitialized());

// Wait for discovery
writer.wait_discovery(1, std::chrono::seconds(2));
reader.wait_discovery(1, std::chrono::seconds(2));
EXPECT_EQ(writer.get_matched(), 1u);
EXPECT_EQ(reader.get_matched(), 1u);

// Send samples
auto samples = default_data16kb_data_generator(1);
reader.expected_data(samples);
reader.startReception();
writer.send(samples);
EXPECT_TRUE(samples.empty());

// Wait for reception
reader.block_for_all(std::chrono::seconds(1));
EXPECT_EQ(reader.getReceivedCount(), 1u);

/* Tear-down */
eprosima::fastrtps::rtps::RTPSDomain::removeRTPSParticipant(participant_writer);
}

/* Maximum number of bytes allowed for an RTPS datagram generated by this writer. */
TEST(RTPS, max_output_message_size_writer)
{
const uint32_t segment_size = 1470;
std::string segment_size_str = std::to_string(segment_size);

auto testTransport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();
testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram)
{
EXPECT_LE(datagram.length, segment_size);
// Never drop samples
return false;
};
RTPSWithRegistrationWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
writer.add_property("fastdds.max_message_size", segment_size_str).
disable_builtin_transport().add_user_transport_to_pparams(testTransport).init();
ASSERT_TRUE(writer.isInitialized());

RTPSWithRegistrationReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
reader.init();
EXPECT_TRUE(reader.isInitialized());

writer.wait_discovery();
reader.wait_discovery();

EXPECT_EQ(writer.get_matched(), 1u);
EXPECT_EQ(reader.get_matched(), 1u);

// Send samples
auto samples = default_data16kb_data_generator(1);
reader.expected_data(samples);
reader.startReception();
writer.send(samples);
EXPECT_TRUE(samples.empty());

// Wait for reception
reader.block_for_all(std::chrono::seconds(1));
EXPECT_EQ(reader.getReceivedCount(), 1u);

}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit 8abdb54

Please sign in to comment.