diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index a2b68c64e9f..643cfbe5878 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -369,7 +369,7 @@ bool StatelessReader::processDataFragMsg( if (work_change->sequenceNumber < change_to_add->sequenceNumber) { // Pending change should be dropped. Check if it can be reused - if (work_change->serializedPayload.max_size <= sampleSize) + if (sampleSize <= work_change->serializedPayload.max_size) { // Sample fits inside pending change. Reuse it. work_change->copy_not_memcpy(change_to_add); diff --git a/src/cpp/transport/test_UDPv4Transport.cpp b/src/cpp/transport/test_UDPv4Transport.cpp index 8776a68de7f..f93495198cb 100644 --- a/src/cpp/transport/test_UDPv4Transport.cpp +++ b/src/cpp/transport/test_UDPv4Transport.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -41,6 +42,10 @@ test_UDPv4Transport::test_UDPv4Transport(const test_UDPv4TransportDescriptor& de test_UDPv4Transport_ShutdownAllNetwork = false; UDPv4Transport::mSendBufferSize = descriptor.sendBufferSize; UDPv4Transport::mReceiveBufferSize = descriptor.receiveBufferSize; + for (auto interf : descriptor.interfaceWhiteList) + { + UDPv4Transport::interface_whitelist_.emplace_back(asio::ip::address_v4::from_string(interf)); + } test_UDPv4Transport_DropLog.clear(); test_UDPv4Transport_DropLogLength = descriptor.dropLogLength; } diff --git a/test/blackbox/BlackboxTests.hpp b/test/blackbox/BlackboxTests.hpp index 86d95d5177d..f9cb2fff9b3 100644 --- a/test/blackbox/BlackboxTests.hpp +++ b/test/blackbox/BlackboxTests.hpp @@ -113,6 +113,8 @@ std::list default_data300kb_data_generator(size_t max = 0); std::list default_data300kb_mix_data_generator(size_t max = 0); +std::list default_data96kb_data300kb_data_generator(size_t max = 0); + /****** Auxiliary lambda functions ******/ extern const std::function default_helloworld_print; diff --git a/test/blackbox/BlackboxTestsPubSubFragments.cpp b/test/blackbox/BlackboxTestsPubSubFragments.cpp index b52bc981464..8bfea49ffc5 100644 --- a/test/blackbox/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/BlackboxTestsPubSubFragments.cpp @@ -294,6 +294,68 @@ TEST(PubSubFragments, AsyncPubSubAsReliableData300kbInLossyConditions) testTransport->dropLogLength); } +// Test introduced to verify the fix of the bug (#7609 Do not reuse cache change if sample does not fit) +// detected in relase 1.9.4 +TEST(PubSubFragments, AsyncPubSubAsBestEffortAlternateSizeInLossyConditions) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + auto reader_transport = std::make_shared(); + reader_transport->interfaceWhiteList.push_back("127.0.0.1"); + + reader + .disable_builtin_transport() + .add_user_transport_to_pparams(reader_transport) + .history_depth(5) + .reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS) + .mem_policy(eprosima::fastrtps::rtps::DYNAMIC_RESERVE_MEMORY_MODE) + .init(); + + ASSERT_TRUE(reader.isInitialized()); + + + // To simulate lossy conditions, we are going to remove the default + // bultin transport, and instead use a lossy shim layer variant. + auto testTransport = std::make_shared(); + testTransport->sendBufferSize = 65536; + testTransport->receiveBufferSize = 65536; + // We drop 50% of all data frags + testTransport->dropDataFragMessagesPercentage = 50; + testTransport->dropLogLength = 10; + // Only one interface in order to really drop 50% of packages!!! + testTransport->interfaceWhiteList.push_back("127.0.0.1"); + + writer + .disable_builtin_transport() + .add_user_transport_to_pparams(testTransport) + .history_depth(5) + .asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE) + .init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_data96kb_data300kb_data_generator(2); + + reader.startReception(data); + writer.send(data); + + // All data has 7 fragments so when 3 has been lost all data has been sent + // Wait until then + while(eprosima::fastrtps::rtps::test_UDPv4Transport::test_UDPv4Transport_DropLog.size() < 3) + { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + // A second should be enough time to assure all data has been received + std::this_thread::sleep_for(std::chrono::seconds(1)); +} + TEST(PubSubFragments, AsyncPubSubAsReliableData300kbInLossyConditionsSmallFragments) { PubSubReader reader(TEST_TOPIC_NAME); diff --git a/test/blackbox/PubSubReader.hpp b/test/blackbox/PubSubReader.hpp index e25b93311b7..5e853951382 100644 --- a/test/blackbox/PubSubReader.hpp +++ b/test/blackbox/PubSubReader.hpp @@ -457,6 +457,12 @@ class PubSubReader return *this; } + PubSubReader& mem_policy(const eprosima::fastrtps::rtps::MemoryManagementPolicy mem_policy) + { + subscriber_attr_.historyMemoryPolicy = mem_policy; + return *this; + } + PubSubReader& deadline_period(const eprosima::fastrtps::Duration_t deadline_period) { subscriber_attr_.qos.m_deadline.period = deadline_period; diff --git a/test/blackbox/utils/data_generators.cpp b/test/blackbox/utils/data_generators.cpp index 0db52e3d087..92cb1ca3d26 100644 --- a/test/blackbox/utils/data_generators.cpp +++ b/test/blackbox/utils/data_generators.cpp @@ -159,3 +159,27 @@ std::list default_data300kb_mix_data_generator(size_t max) return returnedValue; } + +const size_t data96kb_length = 96*1024; +std::list default_data96kb_data300kb_data_generator(size_t max) +{ + unsigned char index = 1; + size_t maximum = max ? max : 10; + std::list returnedValue(maximum); + + std::generate(returnedValue.begin(), returnedValue.end(), [&index] + { + Data1mb data; + size_t length = index % 2 != 0 ? data96kb_length : data300kb_length; + data.data().resize(length); + data.data()[0] = index; + for (size_t i = 1; i < length; ++i) + { + data.data()[i] = static_cast(i + data.data()[0]); + } + ++index; + return data; + }); + + return returnedValue; +}