From 02ded61c38058138939c7202f51e647c0fd5365f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Thu, 1 Jul 2021 16:25:44 +0200 Subject: [PATCH] Refs #11903. Add unit tests for publish modes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo González Moreno --- .../fastdds/rtps/messages/RTPSMessageGroup.h | 6 +- .../fastdds/rtps/writer/RTPSWriter.h | 32 +- test/unittest/rtps/flowcontrol/CMakeLists.txt | 35 +- .../FlowControllerPublishModesTests.cpp | 1356 +++++++++++++++++ 4 files changed, 1417 insertions(+), 12 deletions(-) create mode 100644 test/unittest/rtps/flowcontrol/FlowControllerPublishModesTests.cpp diff --git a/test/mock/rtps/RTPSMessageGroup/fastdds/rtps/messages/RTPSMessageGroup.h b/test/mock/rtps/RTPSMessageGroup/fastdds/rtps/messages/RTPSMessageGroup.h index ed401d0f6b8..3180ad4af73 100644 --- a/test/mock/rtps/RTPSMessageGroup/fastdds/rtps/messages/RTPSMessageGroup.h +++ b/test/mock/rtps/RTPSMessageGroup/fastdds/rtps/messages/RTPSMessageGroup.h @@ -48,8 +48,6 @@ class RTPSMessageGroup MOCK_METHOD0(flush_and_reset, void()); - MOCK_METHOD0(reset_current_bytes_processed, void()); - MOCK_METHOD0(get_current_bytes_processed, uint32_t()); void change_transmitter( @@ -63,6 +61,10 @@ class RTPSMessageGroup { } + void reset_current_bytes_processed() + { + } + }; } // namespace rtps diff --git a/test/mock/rtps/RTPSWriter/fastdds/rtps/writer/RTPSWriter.h b/test/mock/rtps/RTPSWriter/fastdds/rtps/writer/RTPSWriter.h index 6dfa7fa892d..776e818294f 100644 --- a/test/mock/rtps/RTPSWriter/fastdds/rtps/writer/RTPSWriter.h +++ b/test/mock/rtps/RTPSWriter/fastdds/rtps/writer/RTPSWriter.h @@ -41,16 +41,32 @@ class RTPSWriter : public Endpoint { public: + RTPSWriter() + { + static uint8_t entity_id = 0; + // Generate a guid. + m_guid.entityId.value[3] = ++entity_id; + } + virtual ~RTPSWriter() = default; virtual bool matched_reader_add( - const ReaderProxyData& ratt) = 0; + const ReaderProxyData&) + { + return false; + } virtual bool matched_reader_remove( - const GUID_t& ratt) = 0; + const GUID_t&) + { + return false; + } virtual bool matched_reader_is_matched( - const GUID_t& rguid) = 0; + const GUID_t&) + { + return false; + } WriterListener* getListener() const { @@ -83,8 +99,6 @@ class RTPSWriter : public Endpoint #endif // FASTDDS_STATISTICS // *INDENT-OFF* Uncrustify makes a mess with MOCK_METHOD macros - MOCK_CONST_METHOD0(getGuid, const GUID_t& ()); - MOCK_METHOD3(new_change, CacheChange_t* ( const std::function&, ChangeKind_t, @@ -127,6 +141,11 @@ class RTPSWriter : public Endpoint // *INDENT-ON* + const GUID_t& getGuid() + { + return m_guid; + } + virtual void updateAttributes( const WriterAttributes&) { @@ -217,14 +236,13 @@ class RTPSWriter : public Endpoint WriterListener* listener_; - const GUID_t m_guid; + GUID_t m_guid; LivelinessLostStatus liveliness_lost_status_; LocatorSelectorSender general_locator_selector_ = LocatorSelectorSender(*this, ResourceLimitedContainerConfig()); LocatorSelectorSender async_locator_selector_ = LocatorSelectorSender(*this, ResourceLimitedContainerConfig()); - }; } // namespace rtps diff --git a/test/unittest/rtps/flowcontrol/CMakeLists.txt b/test/unittest/rtps/flowcontrol/CMakeLists.txt index 1b6ea42e3f6..18a5355b771 100644 --- a/test/unittest/rtps/flowcontrol/CMakeLists.txt +++ b/test/unittest/rtps/flowcontrol/CMakeLists.txt @@ -22,10 +22,9 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER)) add_definitions(-D_WIN32_WINNT=0x0601) endif() - set(FLOWCONTROLLERFACTORYTESTS_SOURCE - FlowControllerFactoryTests.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp + set(FLOWCONTROLLER_COMMON_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/FlowControllerConsts.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/writer/LocatorSelectorSender.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp @@ -34,6 +33,12 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER)) ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/ThroughputControllerDescriptor.cpp ) + set(FLOWCONTROLLERFACTORYTESTS_SOURCE + ${FLOWCONTROLLER_COMMON_SOURCE} + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp + FlowControllerFactoryTests.cpp + ) + add_executable(FlowControllerFactoryTests ${FLOWCONTROLLERFACTORYTESTS_SOURCE}) target_compile_definitions(FlowControllerFactoryTests PRIVATE FASTRTPS_NO_LIB $<$>,$>:__DEBUG> @@ -52,5 +57,29 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER)) ) endif() add_gtest(FlowControllerFactoryTests SOURCES ${FLOWCONTROLLERFACTORYTESTS_SOURCE}) + + set(FLOWCONTROLLERPUBLISHMODESTESTS_SOURCE + ${FLOWCONTROLLER_COMMON_SOURCE} + FlowControllerPublishModesTests.cpp + ) + + add_executable(FlowControllerPublishModesTests ${FLOWCONTROLLERPUBLISHMODESTESTS_SOURCE}) + target_compile_definitions(FlowControllerPublishModesTests PRIVATE FASTRTPS_NO_LIB + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) + target_include_directories(FlowControllerPublishModesTests PRIVATE ${GTEST_INCLUDE_DIRS} + ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSWriter + ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSMessageGroup + ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include + ${PROJECT_SOURCE_DIR}/src/cpp + ) + target_link_libraries(FlowControllerPublishModesTests ${GTEST_LIBRARIES} ${GMOCK_LIBRARIES}) + if(MSVC OR MSVC_IDE) + target_link_libraries(FlowControllerPublishModesTests ${PRIVACY} + iphlpapi Shlwapi + ) + endif() + add_gtest(FlowControllerPublishModesTests SOURCES ${FLOWCONTROLLERPUBLISHMODESTESTS_SOURCE}) endif() endif() diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesTests.cpp new file mode 100644 index 00000000000..27a00118a90 --- /dev/null +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesTests.cpp @@ -0,0 +1,1356 @@ +#include +#include + +using namespace eprosima::fastdds::rtps; +using namespace testing; + +struct WriterDeliveryCallInfo +{ + void wait_changes_was_delivered( + size_t number_of_changes) + { + std::unique_lock lock(changes_delivered_mutex); + number_changes_delivered_cv.wait(lock, [&]() + { + return number_of_changes == changes_delivered.size(); + }); + } + + std::thread::id last_thread_delivering_sample; + + std::vector changes_delivered; + + std::mutex changes_delivered_mutex; + + std::condition_variable number_changes_delivered_cv; +}; + +#define INIT_CACHE_CHANGE(change, writer) \ + change.writerGUID = writer.getGuid(); \ + change.writer_info.previous = nullptr; \ + change.writer_info.next = nullptr; \ + change.serializedPayload.length = 10000; + +TEST(FlowControllerPublishModes, pure_sync_publish_mode) +{ + FlowControllerDescriptor flow_controller_descr; + FlowControllerImpl pure_sync(nullptr, + &flow_controller_descr); + pure_sync.init(); + + // Initialize callback to get info. + WriterDeliveryCallInfo send_call_info; + auto send_functor = [&send_call_info]( + eprosima::fastrtps::rtps::CacheChange_t*, + eprosima::fastrtps::rtps::RTPSMessageGroup&, + eprosima::fastrtps::rtps::LocatorSelectorSender&, + const std::chrono::time_point&) + { + send_call_info.last_thread_delivering_sample = std::this_thread::get_id(); + }; + + + // Instantiate writers. + eprosima::fastrtps::rtps::RTPSWriter writer1; + eprosima::fastrtps::rtps::RTPSWriter writer2; + + // Register writers. + pure_sync.register_writer(&writer1); + + eprosima::fastrtps::rtps::CacheChange_t change_writer1; + INIT_CACHE_CHANGE(change_writer1, writer1); + + eprosima::fastrtps::rtps::CacheChange_t change_writer2; + INIT_CACHE_CHANGE(change_writer2, writer2); + + // Testing add_new_sample. Writer will be able to deliver it. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.general_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(pure_sync.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + EXPECT_EQ(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + + EXPECT_CALL(writer2, + deliver_sample_nts(&change_writer2, _, Ref(writer2.general_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(pure_sync.add_new_sample(&writer2, &change_writer2, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + EXPECT_EQ(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + + // Testing add_new_sample. Writer will not be able to deliver it. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.general_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED))); + ASSERT_FALSE(pure_sync.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + EXPECT_EQ(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + + EXPECT_CALL(writer2, + deliver_sample_nts(&change_writer2, _, Ref(writer2.general_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED))); + ASSERT_FALSE(pure_sync.add_new_sample(&writer2, &change_writer2, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + EXPECT_EQ(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + + // Testing add_old_sample. Writers never be called. + ASSERT_FALSE(pure_sync.add_old_sample(&writer1, &change_writer1)); + ASSERT_FALSE(pure_sync.add_old_sample(&writer2, &change_writer2)); + + pure_sync.unregister_writer(&writer1); +} + +TEST(FlowControllerPublishModes, sync_publish_mode) +{ + FlowControllerDescriptor flow_controller_descr; + FlowControllerImpl sync(nullptr, + &flow_controller_descr); + sync.init(); + + // Instantiate writers. + eprosima::fastrtps::rtps::RTPSWriter writer1; + eprosima::fastrtps::rtps::RTPSWriter writer2; + + // Initialize callback to get info. + WriterDeliveryCallInfo send_call_info; + auto send_functor_adding = [&send_call_info]( + eprosima::fastrtps::rtps::CacheChange_t* change, + eprosima::fastrtps::rtps::RTPSMessageGroup&, + eprosima::fastrtps::rtps::LocatorSelectorSender&, + const std::chrono::time_point&) + { + send_call_info.last_thread_delivering_sample = std::this_thread::get_id(); + { + std::unique_lock lock(send_call_info.changes_delivered_mutex); + send_call_info.changes_delivered.push_back(change); + } + send_call_info.number_changes_delivered_cv.notify_one(); + }; + auto send_functor_not_adding = [&send_call_info]( + eprosima::fastrtps::rtps::CacheChange_t*, + eprosima::fastrtps::rtps::RTPSMessageGroup&, + eprosima::fastrtps::rtps::LocatorSelectorSender&, + const std::chrono::time_point&) + { + send_call_info.last_thread_delivering_sample = std::this_thread::get_id(); + }; + + // Register writers. + sync.register_writer(&writer1); + + eprosima::fastrtps::rtps::CacheChange_t change_writer1; + INIT_CACHE_CHANGE(change_writer1, writer1); + + eprosima::fastrtps::rtps::CacheChange_t change_writer2; + INIT_CACHE_CHANGE(change_writer2, writer2); + + // Testing add_new_sample. Writer will be able to deliver it. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.general_locator_selector_), _)). + WillOnce(DoAll(send_functor_not_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(sync.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + EXPECT_EQ(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + + EXPECT_CALL(writer2, + deliver_sample_nts(&change_writer2, _, Ref(writer2.general_locator_selector_), _)). + WillOnce(DoAll(send_functor_not_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(sync.add_new_sample(&writer2, &change_writer2, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + EXPECT_EQ(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + + // Testing add_new_sample. Writer will not be able to deliver it. + auto& fail_call = EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.general_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)).After(fail_call). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(sync.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_old_sample. Writer will be able to deliver it. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer1)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_old_sample. Writer will not be able to deliver it. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer1)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_old_sample with a change already enqueued. + change_writer1.writer_info.previous = (eprosima::fastrtps::rtps::CacheChange_t*)1; + change_writer1.writer_info.next = (eprosima::fastrtps::rtps::CacheChange_t*)1; + + // Send 10 samples using add_old_sample. + INIT_CACHE_CHANGE(change_writer1, writer1); + INIT_CACHE_CHANGE(change_writer2, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer3; + INIT_CACHE_CHANGE(change_writer3, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer4; + INIT_CACHE_CHANGE(change_writer4, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer5; + INIT_CACHE_CHANGE(change_writer5, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer6; + INIT_CACHE_CHANGE(change_writer6, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer7; + INIT_CACHE_CHANGE(change_writer7, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer8; + INIT_CACHE_CHANGE(change_writer8, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer9; + INIT_CACHE_CHANGE(change_writer9, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer10; + INIT_CACHE_CHANGE(change_writer10, writer1); + + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer2, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer3, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer4, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer5, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer6, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer7, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer8, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer9, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer10, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer1)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer2)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer3)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer4)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer5)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer6)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer7)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer8)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer9)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer10)); + send_call_info.wait_changes_was_delivered(10); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Remove changes + EXPECT_CALL(writer1, + deliver_sample_nts(_, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor_adding, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))). + WillRepeatedly(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer1)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer2)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer3)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer4)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer5)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer6)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer7)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer8)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer9)); + ASSERT_TRUE(sync.add_old_sample(&writer1, &change_writer10)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + writer1.getMutex().lock(); + ASSERT_TRUE(nullptr == change_writer1.writer_info.next && + nullptr == change_writer1.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer2.writer_info.next && + nullptr != change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer3.writer_info.next && + nullptr != change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer4.writer_info.next && + nullptr != change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer5.writer_info.next && + nullptr != change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer6.writer_info.next && + nullptr != change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer7.writer_info.next && + nullptr != change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer8.writer_info.next && + nullptr != change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer9.writer_info.next && + nullptr != change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer10.writer_info.next && + nullptr != change_writer10.writer_info.previous); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + sync.remove_change(&change_writer10); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + sync.remove_change(&change_writer9); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + sync.remove_change(&change_writer8); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + sync.remove_change(&change_writer7); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + sync.remove_change(&change_writer6); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + sync.remove_change(&change_writer5); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + sync.remove_change(&change_writer4); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + sync.remove_change(&change_writer3); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + sync.remove_change(&change_writer2); + writer1.getMutex().unlock(); + ASSERT_TRUE(nullptr == change_writer2.writer_info.next && + nullptr == change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer3.writer_info.next && + nullptr == change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer4.writer_info.next && + nullptr == change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer5.writer_info.next && + nullptr == change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer6.writer_info.next && + nullptr == change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer7.writer_info.next && + nullptr == change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer8.writer_info.next && + nullptr == change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer9.writer_info.next && + nullptr == change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer10.writer_info.next && + nullptr == change_writer10.writer_info.previous); + + sync.unregister_writer(&writer1); +} + +TEST(FlowControllerPublishModes, async_publish_mode) +{ + FlowControllerDescriptor flow_controller_descr; + FlowControllerImpl async(nullptr, + &flow_controller_descr); + async.init(); + + // Instantiate writers. + eprosima::fastrtps::rtps::RTPSWriter writer1; + eprosima::fastrtps::rtps::RTPSWriter writer2; + + // Initialize callback to get info. + WriterDeliveryCallInfo send_call_info; + auto send_functor = [&send_call_info]( + eprosima::fastrtps::rtps::CacheChange_t* change, + eprosima::fastrtps::rtps::RTPSMessageGroup&, + eprosima::fastrtps::rtps::LocatorSelectorSender&, + const std::chrono::time_point&) + { + send_call_info.last_thread_delivering_sample = std::this_thread::get_id(); + { + std::unique_lock lock(send_call_info.changes_delivered_mutex); + send_call_info.changes_delivered.push_back(change); + } + send_call_info.number_changes_delivered_cv.notify_one(); + }; + + // Register writers. + async.register_writer(&writer1); + + eprosima::fastrtps::rtps::CacheChange_t change_writer1; + INIT_CACHE_CHANGE(change_writer1, writer1); + + eprosima::fastrtps::rtps::CacheChange_t change_writer2; + INIT_CACHE_CHANGE(change_writer2, writer2); + + // Testing add_new_sample. Writer will be able to deliver it. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_new_sample. Writer will not be able to deliver it. + auto& fail_call = EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)).After(fail_call). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_old_sample. Writer will be able to deliver it. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer1)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_old_sample. Writer will not be able to deliver it. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer1)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_old_sample with a change already enqueued. + change_writer1.writer_info.previous = (eprosima::fastrtps::rtps::CacheChange_t*)1; + change_writer1.writer_info.next = (eprosima::fastrtps::rtps::CacheChange_t*)1; + + // Send 10 samples using add_new_sample. + INIT_CACHE_CHANGE(change_writer1, writer1); + INIT_CACHE_CHANGE(change_writer2, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer3; + INIT_CACHE_CHANGE(change_writer3, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer4; + INIT_CACHE_CHANGE(change_writer4, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer5; + INIT_CACHE_CHANGE(change_writer5, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer6; + INIT_CACHE_CHANGE(change_writer6, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer7; + INIT_CACHE_CHANGE(change_writer7, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer8; + INIT_CACHE_CHANGE(change_writer8, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer9; + INIT_CACHE_CHANGE(change_writer9, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer10; + INIT_CACHE_CHANGE(change_writer10, writer1); + + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer2, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer3, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer4, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer5, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer6, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer7, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer8, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer9, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer10, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer2, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer3, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer4, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer5, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer6, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer7, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer8, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer9, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer10, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(10); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Send 10 samples using add_old_sample. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer2, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer3, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer4, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer5, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer6, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer7, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer8, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer9, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer10, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer1)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer2)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer3)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer4)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer5)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer6)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer7)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer8)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer9)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer10)); + send_call_info.wait_changes_was_delivered(10); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Remove changes after add_new_sample. + EXPECT_CALL(writer1, + deliver_sample_nts(_, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))). + WillRepeatedly(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer2, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer3, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer4, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer5, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer6, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer7, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer8, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer9, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer10, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + writer1.getMutex().lock(); + ASSERT_TRUE(nullptr == change_writer1.writer_info.next && + nullptr == change_writer1.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer2.writer_info.next && + nullptr != change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer3.writer_info.next && + nullptr != change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer4.writer_info.next && + nullptr != change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer5.writer_info.next && + nullptr != change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer6.writer_info.next && + nullptr != change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer7.writer_info.next && + nullptr != change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer8.writer_info.next && + nullptr != change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer9.writer_info.next && + nullptr != change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer10.writer_info.next && + nullptr != change_writer10.writer_info.previous); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer10); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer9); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer8); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer7); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer6); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer5); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer4); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer3); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer2); + writer1.getMutex().unlock(); + ASSERT_TRUE(nullptr == change_writer2.writer_info.next && + nullptr == change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer3.writer_info.next && + nullptr == change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer4.writer_info.next && + nullptr == change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer5.writer_info.next && + nullptr == change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer6.writer_info.next && + nullptr == change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer7.writer_info.next && + nullptr == change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer8.writer_info.next && + nullptr == change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer9.writer_info.next && + nullptr == change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer10.writer_info.next && + nullptr == change_writer10.writer_info.previous); + + // Remove changes after add_old_sample. + EXPECT_CALL(writer1, + deliver_sample_nts(_, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))). + WillRepeatedly(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer1)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer2)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer3)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer4)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer5)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer6)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer7)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer8)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer9)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer10)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + writer1.getMutex().lock(); + ASSERT_TRUE(nullptr == change_writer1.writer_info.next && + nullptr == change_writer1.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer2.writer_info.next && + nullptr != change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer3.writer_info.next && + nullptr != change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer4.writer_info.next && + nullptr != change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer5.writer_info.next && + nullptr != change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer6.writer_info.next && + nullptr != change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer7.writer_info.next && + nullptr != change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer8.writer_info.next && + nullptr != change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer9.writer_info.next && + nullptr != change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer10.writer_info.next && + nullptr != change_writer10.writer_info.previous); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer10); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer9); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer8); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer7); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer6); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer5); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer4); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer3); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer2); + writer1.getMutex().unlock(); + ASSERT_TRUE(nullptr == change_writer2.writer_info.next && + nullptr == change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer3.writer_info.next && + nullptr == change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer4.writer_info.next && + nullptr == change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer5.writer_info.next && + nullptr == change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer6.writer_info.next && + nullptr == change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer7.writer_info.next && + nullptr == change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer8.writer_info.next && + nullptr == change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer9.writer_info.next && + nullptr == change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer10.writer_info.next && + nullptr == change_writer10.writer_info.previous); + + async.unregister_writer(&writer1); +} + +struct FlowControllerLimitedAsyncPublishModeMock : FlowControllerLimitedAsyncPublishMode +{ + FlowControllerLimitedAsyncPublishModeMock( + eprosima::fastrtps::rtps::RTPSParticipantImpl* participant, + const FlowControllerDescriptor* descriptor) + : FlowControllerLimitedAsyncPublishMode(participant, descriptor) + { + group_mock = &group; + } + + static eprosima::fastrtps::rtps::RTPSMessageGroup* get_group() + { + return group_mock; + } + + static eprosima::fastrtps::rtps::RTPSMessageGroup* group_mock; +}; +eprosima::fastrtps::rtps::RTPSMessageGroup* FlowControllerLimitedAsyncPublishModeMock::group_mock = nullptr; + +TEST(FlowControllerPublishModes, limited_async_publish_mode) +{ + FlowControllerDescriptor flow_controller_descr; + flow_controller_descr.max_bytes_per_period = 10200; + flow_controller_descr.period_ms = 10; + FlowControllerImpl async(nullptr, + &flow_controller_descr); + async.init(); + + // Instantiate writers. + eprosima::fastrtps::rtps::RTPSWriter writer1; + eprosima::fastrtps::rtps::RTPSWriter writer2; + + // Initialize callback to get info. + WriterDeliveryCallInfo send_call_info; + auto send_functor = [&send_call_info]( + eprosima::fastrtps::rtps::CacheChange_t* change, + eprosima::fastrtps::rtps::RTPSMessageGroup&, + eprosima::fastrtps::rtps::LocatorSelectorSender&, + const std::chrono::time_point&) + { + send_call_info.last_thread_delivering_sample = std::this_thread::get_id(); + { + std::unique_lock lock(send_call_info.changes_delivered_mutex); + send_call_info.changes_delivered.push_back(change); + } + send_call_info.number_changes_delivered_cv.notify_one(); + }; + + // Register writers. + async.register_writer(&writer1); + + eprosima::fastrtps::rtps::CacheChange_t change_writer1; + INIT_CACHE_CHANGE(change_writer1, writer1); + + eprosima::fastrtps::rtps::CacheChange_t change_writer2; + INIT_CACHE_CHANGE(change_writer2, writer2); + + // Testing add_new_sample. Writer will be able to deliver it. + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), get_current_bytes_processed()).WillOnce(Return( + 0)); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_new_sample. Writer will not be able to deliver it. + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).Times(2).WillRepeatedly(Return( + 0)); + auto& fail_call = EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)).After(fail_call). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_old_sample. Writer will be able to deliver it. + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), get_current_bytes_processed()).WillOnce(Return( + 0)); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer1)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_old_sample. Writer will not be able to deliver it. + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).Times(2).WillRepeatedly(Return( + 0)); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer1)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Testing add_old_sample with a change already enqueued. + change_writer1.writer_info.previous = (eprosima::fastrtps::rtps::CacheChange_t*)1; + change_writer1.writer_info.next = (eprosima::fastrtps::rtps::CacheChange_t*)1; + + // Send 10 samples using add_new_sample. + INIT_CACHE_CHANGE(change_writer1, writer1); + INIT_CACHE_CHANGE(change_writer2, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer3; + INIT_CACHE_CHANGE(change_writer3, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer4; + INIT_CACHE_CHANGE(change_writer4, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer5; + INIT_CACHE_CHANGE(change_writer5, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer6; + INIT_CACHE_CHANGE(change_writer6, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer7; + INIT_CACHE_CHANGE(change_writer7, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer8; + INIT_CACHE_CHANGE(change_writer8, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer9; + INIT_CACHE_CHANGE(change_writer9, writer1); + eprosima::fastrtps::rtps::CacheChange_t change_writer10; + INIT_CACHE_CHANGE(change_writer10, writer1); + + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).Times(10).WillRepeatedly(Return( + 0)); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer2, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer3, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer4, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer5, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer6, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer7, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer8, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer9, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer10, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer2, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer3, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer4, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer5, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer6, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer7, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer8, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer9, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer10, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(10); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Send 10 samples using add_old_sample. + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).Times(10).WillRepeatedly(Return( + 0)); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer2, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer3, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer4, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer5, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer6, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer7, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer8, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer9, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer10, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer1)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer2)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer3)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer4)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer5)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer6)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer7)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer8)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer9)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer10)); + send_call_info.wait_changes_was_delivered(10); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Remove changes after add_new_sample. + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).WillRepeatedly(Return( + 0)); + EXPECT_CALL(writer1, + deliver_sample_nts(_, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))). + WillRepeatedly(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer2, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer3, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer4, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer5, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer6, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer7, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer8, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer9, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer10, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + writer1.getMutex().lock(); + ASSERT_TRUE(nullptr == change_writer1.writer_info.next && + nullptr == change_writer1.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer2.writer_info.next && + nullptr != change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer3.writer_info.next && + nullptr != change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer4.writer_info.next && + nullptr != change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer5.writer_info.next && + nullptr != change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer6.writer_info.next && + nullptr != change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer7.writer_info.next && + nullptr != change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer8.writer_info.next && + nullptr != change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer9.writer_info.next && + nullptr != change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer10.writer_info.next && + nullptr != change_writer10.writer_info.previous); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer10); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer9); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer8); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer7); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer6); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer5); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer4); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer3); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer2); + writer1.getMutex().unlock(); + ASSERT_TRUE(nullptr == change_writer2.writer_info.next && + nullptr == change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer3.writer_info.next && + nullptr == change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer4.writer_info.next && + nullptr == change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer5.writer_info.next && + nullptr == change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer6.writer_info.next && + nullptr == change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer7.writer_info.next && + nullptr == change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer8.writer_info.next && + nullptr == change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer9.writer_info.next && + nullptr == change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer10.writer_info.next && + nullptr == change_writer10.writer_info.previous); + + // Remove changes after add_old_sample. + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).WillRepeatedly(Return( + 0)); + EXPECT_CALL(writer1, + deliver_sample_nts(_, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))). + WillRepeatedly(Return(eprosima::fastrtps::rtps::DeliveryRetCode::NOT_DELIVERED)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer1)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer2)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer3)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer4)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer5)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer6)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer7)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer8)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer9)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer10)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + writer1.getMutex().lock(); + ASSERT_TRUE(nullptr == change_writer1.writer_info.next && + nullptr == change_writer1.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer2.writer_info.next && + nullptr != change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer3.writer_info.next && + nullptr != change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer4.writer_info.next && + nullptr != change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer5.writer_info.next && + nullptr != change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer6.writer_info.next && + nullptr != change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer7.writer_info.next && + nullptr != change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer8.writer_info.next && + nullptr != change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer9.writer_info.next && + nullptr != change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr != change_writer10.writer_info.next && + nullptr != change_writer10.writer_info.previous); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer10); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer9); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer8); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer7); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer6); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer5); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer4); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer3); + writer1.getMutex().unlock(); + writer1.getMutex().lock(); + async.remove_change(&change_writer2); + writer1.getMutex().unlock(); + ASSERT_TRUE(nullptr == change_writer2.writer_info.next && + nullptr == change_writer2.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer3.writer_info.next && + nullptr == change_writer3.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer4.writer_info.next && + nullptr == change_writer4.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer5.writer_info.next && + nullptr == change_writer5.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer6.writer_info.next && + nullptr == change_writer6.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer7.writer_info.next && + nullptr == change_writer7.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer8.writer_info.next && + nullptr == change_writer8.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer9.writer_info.next && + nullptr == change_writer9.writer_info.previous); + ASSERT_TRUE(nullptr == change_writer10.writer_info.next && + nullptr == change_writer10.writer_info.previous); + + // Sending 10 samples applying limitations with add_new_sample. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer2, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer3, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer4, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer5, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer6, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer7, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer8, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer9, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer10, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + auto& prev = EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()). + Times(10).WillRepeatedly(Return(10100)); + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).After(prev). + WillRepeatedly(Return(0)); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer1, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + auto& prev2 = EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()). + Times(10).WillRepeatedly(Return(10100)); + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).After(prev2). + WillRepeatedly(Return(0)); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer2, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(2); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + auto& prev3 = EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()). + Times(10).WillRepeatedly(Return(10100)); + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).After(prev3). + WillRepeatedly(Return(0)); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer3, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(3); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + auto& prev4 = EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()). + Times(10).WillRepeatedly(Return(10100)); + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).After(prev4). + WillRepeatedly(Return(0)); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer4, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(4); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer5, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer6, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer7, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer8, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer9, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + ASSERT_TRUE(async.add_new_sample(&writer1, &change_writer10, + std::chrono::steady_clock::now() + std::chrono::hours(24))); + send_call_info.wait_changes_was_delivered(10); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + // Sending 10 samples applying limitations with add_old_sample. + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer1, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer2, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer3, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer4, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer5, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer6, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer7, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer8, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer9, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + EXPECT_CALL(writer1, + deliver_sample_nts(&change_writer10, _, Ref(writer1.async_locator_selector_), _)). + WillOnce(Return(eprosima::fastrtps::rtps::DeliveryRetCode::EXCEEDED_LIMIT)). + WillOnce(DoAll(send_functor, Return(eprosima::fastrtps::rtps::DeliveryRetCode::DELIVERED))); + auto& old_prev = EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()). + Times(10).WillRepeatedly(Return(10100)); + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).After(old_prev). + WillRepeatedly(Return(0)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer1)); + send_call_info.wait_changes_was_delivered(1); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + auto& old_prev2 = EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()). + Times(10).WillRepeatedly(Return(10100)); + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).After(old_prev2). + WillRepeatedly(Return(0)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer2)); + send_call_info.wait_changes_was_delivered(2); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + auto& old_prev3 = EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()). + Times(10).WillRepeatedly(Return(10100)); + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).After(old_prev3). + WillRepeatedly(Return(0)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer3)); + send_call_info.wait_changes_was_delivered(3); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + auto& old_prev4 = EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()). + Times(10).WillRepeatedly(Return(10100)); + EXPECT_CALL(*FlowControllerLimitedAsyncPublishModeMock::get_group(), + get_current_bytes_processed()).After(old_prev4). + WillRepeatedly(Return(0)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer4)); + send_call_info.wait_changes_was_delivered(4); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer5)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer6)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer7)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer8)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer9)); + ASSERT_TRUE(async.add_old_sample(&writer1, &change_writer10)); + send_call_info.wait_changes_was_delivered(10); + EXPECT_NE(std::this_thread::get_id(), send_call_info.last_thread_delivering_sample); + send_call_info.changes_delivered.clear(); + + async.unregister_writer(&writer1); +} + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}