diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 1fa2cbb4dd..ac74fef080 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -593,3 +593,49 @@ TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) { rclcpp::shutdown(); } + +TYPED_TEST(TestExecutors, testIntraprocessRetrigger) { + using ExecutorType = TypeParam; + ExecutorType executor; + + const std::string kTopic = "~/test_service_intraprocess"; + const size_t kNumMessages = 100; + + auto ipc_node = std::make_shared("node"); + rclcpp::PublisherOptions po; + po.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + auto pub = ipc_node->create_publisher(kTopic, 1, po); + + std::atomic_int counter = 0; + auto count_subs = [&counter](std::shared_ptr) { + counter.fetch_add(1); + }; + + rclcpp::SubscriptionOptions so; + so.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + auto sub = ipc_node->create_subscription(kTopic, kNumMessages, count_subs, so); + + executor.add_node(ipc_node); + + EXPECT_EQ(0, counter.load()); + pub->publish(test_msgs::msg::Empty()); + + const std::chrono::milliseconds sleep_per_loop(10); + while (1u != counter.load()) { + rclcpp::sleep_for(sleep_per_loop); + executor.spin_some(); + } + EXPECT_EQ(1u, counter.load()); + + // reset counter + counter.store(0); + + for (size_t ii = 0; ii < kNumMessages; ++ii) + { + pub->publish(test_msgs::msg::Empty()); + } + auto timer = ipc_node->create_wall_timer(std::chrono::milliseconds(250), [&executor](){executor.cancel();}); + executor.spin(); + + EXPECT_EQ(kNumMessages, counter.load()); +}