Skip to content

Commit

Permalink
Handle exceptions in event publisher and discovery threads
Browse files Browse the repository at this point in the history
- Exceptions potentially may come from the underlying node operations
when we are invalidating context via rclcpp::shutdown(). We need to have
possibility to correct finish recording in this case.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
morlov-apexai committed Jun 11, 2023
1 parent b04f79f commit 1c79e8a
Showing 1 changed file with 33 additions and 15 deletions.
48 changes: 33 additions & 15 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,12 @@ void RecorderImpl::record()
response->paused = is_paused();
});

split_event_pub_ =
node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>("events/write_split", 1);

// Start the thread that will publish events
event_publisher_thread_ = std::thread(&RecorderImpl::event_publisher_thread_main, this);

split_event_pub_ =
node->create_publisher<rosbag2_interfaces::msg::WriteSplitEvent>("events/write_split", 1);
rosbag2_cpp::bag_events::WriterEventCallbacks callbacks;
callbacks.write_split_callback =
[this](rosbag2_cpp::bag_events::BagSplitInfo & info) {
Expand Down Expand Up @@ -328,7 +329,17 @@ void RecorderImpl::event_publisher_thread_main()
auto message = rosbag2_interfaces::msg::WriteSplitEvent();
message.closed_file = bag_split_info_.closed_file;
message.opened_file = bag_split_info_.opened_file;
split_event_pub_->publish(message);
try {
split_event_pub_->publish(message);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
node->get_logger(),
"Failed to publish message on '/events/write_split' topic. \nError: " << e.what());
} catch (...) {
RCLCPP_ERROR_STREAM(
node->get_logger(),
"Failed to publish message on '/events/write_split' topic.");
}
}
}
RCLCPP_INFO(node->get_logger(), "Event publisher thread: Exiting");
Expand Down Expand Up @@ -373,19 +384,26 @@ bool RecorderImpl::is_paused()
void RecorderImpl::topics_discovery()
{
while (rclcpp::ok() && stop_discovery_ == false) {
auto topics_to_subscribe =
get_requested_or_available_topics();
for (const auto & topic_and_type : topics_to_subscribe) {
warn_if_new_qos_for_subscribed_topic(topic_and_type.first);
}
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);
try {
auto topics_to_subscribe = get_requested_or_available_topics();
for (const auto & topic_and_type : topics_to_subscribe) {
warn_if_new_qos_for_subscribed_topic(topic_and_type.first);
}
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);

if (!record_options_.topics.empty() && subscriptions_.size() == record_options_.topics.size()) {
RCLCPP_INFO(
node->get_logger(),
"All requested topics are subscribed. Stopping discovery...");
return;
if (!record_options_.topics.empty() &&
subscriptions_.size() == record_options_.topics.size())
{
RCLCPP_INFO(
node->get_logger(),
"All requested topics are subscribed. Stopping discovery...");
return;
}
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(node->get_logger(), "Failure in topics discovery.\nError: " << e.what());
} catch (...) {
RCLCPP_ERROR_STREAM(node->get_logger(), "Failure in topics discovery.");
}
std::this_thread::sleep_for(record_options_.topic_polling_interval);
}
Expand Down

0 comments on commit 1c79e8a

Please sign in to comment.