diff --git a/test/unittest/transport/SharedMemTests.cpp b/test/unittest/transport/SharedMemTests.cpp index 8459305fbbd..6cdc2ff7dc4 100644 --- a/test/unittest/transport/SharedMemTests.cpp +++ b/test/unittest/transport/SharedMemTests.cpp @@ -34,6 +34,8 @@ #include #include +#include "../logging/mock/MockConsumer.h" + using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; using namespace eprosima::fastdds; @@ -1409,20 +1411,18 @@ TEST_F(SHMTransportTests, port_not_ok_listener_recover) thread_listener.join(); } +//! This test has been updated to avoid flakiness #20993 TEST_F(SHMTransportTests, buffer_recover) { auto shared_mem_manager = SharedMemManager::create(domain_name); - auto segment = shared_mem_manager->create_segment(3, 3); shared_mem_manager->remove_port(1); auto pub_sub1_write = shared_mem_manager->open_port(1, 8, 1000, SharedMemGlobal::Port::OpenMode::Write); - shared_mem_manager->remove_port(2); auto pub_sub2_write = shared_mem_manager->open_port(2, 8, 1000, SharedMemGlobal::Port::OpenMode::Write); auto sub1_read = shared_mem_manager->open_port(1, 8, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive); - auto sub2_read = shared_mem_manager->open_port(2, 8, 1000, SharedMemGlobal::Port::OpenMode::ReadExclusive); bool exit_listeners = false; @@ -1461,10 +1461,8 @@ TEST_F(SHMTransportTests, buffer_recover) if (buffer) { - { - std::lock_guard lock(received_mutex); - listener2_recv_count.fetch_add(1); - } + listener2_recv_count.fetch_add(1); + //std::cout << "Listener2 received " << listener2_recv_count.load() << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(listener2_sleep_ms)); buffer.reset(); received_cv.notify_one(); @@ -1472,12 +1470,19 @@ TEST_F(SHMTransportTests, buffer_recover) } }); + Log::ClearConsumers(); + Log::Flush(); + eprosima::fastdds::dds::MockConsumer* mock_consumer = new eprosima::fastdds::dds::MockConsumer("RTPS_TRANSPORT_SHM"); + Log::RegisterConsumer(std::unique_ptr(mock_consumer)); + Log::SetVerbosity(Log::Warning); + Log::SetErrorStringFilter(std::regex("Buffer is being invalidated.*")); + // Test 1 (without port overflow) uint32_t send_counter = 0u; bool is_port_ok = false; - while (listener1_recv_count.load() < 16u) + while (listener2_recv_count.load() < 32u) { { // The segment should never overflow @@ -1506,14 +1511,20 @@ TEST_F(SHMTransportTests, buffer_recover) } } - // The slow listener is 4 times slower than the fast one - ASSERT_LT(listener1_recv_count.load() * 3, listener2_recv_count.load()); - ASSERT_GT(listener1_recv_count.load(), listener2_recv_count.load() / 5); + // The ration between the listeners activity (sleep) should correspond to the amount of entries read plus lost + auto consumed_entries = mock_consumer->ConsumedEntries().size(); // buffer reset warnings: non-read samples std::cout << "Test1:" << " Listener1_recv_count " << listener1_recv_count.load() << " Listener2_recv_count " << listener2_recv_count.load() + << " Warning entries consumed: " << consumed_entries << std::endl; - + //ASSERT_EQ((listener1_recv_count.load() * (listener1_sleep_ms/listener2_sleep_ms)), + // (listener2_recv_count.load() + consumed_entries)); + //ASSERT_EQ((listener1_recv_count.load()+consumed_entries), listener2_recv_count.load()); + // The slow listener is 4 times slower than the fast one + EXPECT_LT(listener1_recv_count.load() * 3, listener2_recv_count.load()); + EXPECT_GE(listener1_recv_count.load() * 4, listener2_recv_count.load()); + ASSERT_EQ(1, 0); // Test 2 (with port overflow) listener2_sleep_ms = 0u; send_counter = 0u; @@ -1580,6 +1591,9 @@ TEST_F(SHMTransportTests, buffer_recover) thread_listener1.join(); thread_listener2.join(); + + Log::Reset(); + Log::KillThread(); } TEST_F(SHMTransportTests, remote_segments_free)