Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[humble] Add recorder stop() API (backport #1300) #1396

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,17 @@ class Recorder : public rclcpp::Node
return subscriptions_;
}

/// @brief Stopping recording.
/// @details The stop() is opposite to the record() operation. It will stop recording, dump
/// all buffers to the disk and close writer. The record() can be called again after stop().
ROSBAG2_TRANSPORT_PUBLIC
void stop();

ROSBAG2_TRANSPORT_PUBLIC
const rosbag2_cpp::Writer & get_writer_handle();

/// Pause the recording.
/// @brief Pause the recording.
/// @details Will keep writer open and skip messages upon arrival on subscriptions.
ROSBAG2_TRANSPORT_PUBLIC
void pause();

Expand Down
10 changes: 9 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,19 @@ Recorder::Recorder(
Recorder::~Recorder()
{
keyboard_handler_->delete_key_press_callback(toggle_paused_key_callback_handle_);
stop();
}


void Recorder::stop()
{
stop_discovery_ = true;
if (discovery_future_.valid()) {
discovery_future_.wait();
}

paused_ = true;
subscriptions_.clear();
writer_->close(); // Call writer->close() to finalize current bag file and write metadata

{
std::lock_guard<std::mutex> lock(event_publisher_thread_mutex_);
Expand All @@ -126,6 +133,7 @@ Recorder::~Recorder()

void Recorder::record()
{
paused_ = record_options_.start_paused;
topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides;
if (record_options_.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
snapshot_mode_ = storage_options.snapshot_mode;
(void) storage_options;
(void) converter_options;
writer_close_called_ = false;
}

void close() override {}
void close() override
{
writer_close_called_ = true;
}

void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override
{
Expand Down Expand Up @@ -118,6 +122,11 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
return max_messages_per_file_;
}

bool closed_was_called() const
{
return writer_close_called_;
}

private:
std::unordered_map<std::string, rosbag2_storage::TopicMetadata> topics_;
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages_;
Expand All @@ -128,6 +137,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
rosbag2_cpp::bag_events::EventCallbackManager callback_manager_;
size_t file_number_ = 0;
size_t max_messages_per_file_ = 0;
bool writer_close_called_{false};
};

#endif // ROSBAG2_TRANSPORT__MOCK_SEQUENTIAL_WRITER_HPP_
54 changes: 53 additions & 1 deletion rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are
pub_manager.setup_publisher(string_topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{false, false, {string_topic, array_topic}, "rmw_format", 100ms};
{false, false, {string_topic, array_topic}, "rmw_format", 50ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -88,6 +88,58 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are
EXPECT_THAT(array_messages[0]->float32_values, Eq(array_message->float32_values));
}

TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop)
{
auto string_message = get_messages_strings()[1];
std::string string_topic = "/string_topic";

rosbag2_test_common::PublicationManager pub_manager;
pub_manager.setup_publisher(string_topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{false, false, {string_topic}, "rmw_format", 50ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();

auto & writer = recorder->get_writer_handle();
auto & mock_writer = dynamic_cast<MockSequentialWriter &>(writer.get_implementation_handle());

start_async_spin(recorder);
ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str()));

pub_manager.run_publishers();

EXPECT_FALSE(mock_writer.closed_was_called());
recorder->stop();
EXPECT_TRUE(mock_writer.closed_was_called());

// Record one more time after stop()
recorder->record();

ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str()));
pub_manager.run_publishers();

size_t expected_messages = 4; // 4 because was running recorder-record() and publishers twice
auto ret = rosbag2_test_common::wait_until_shutdown(
std::chrono::seconds(5),
[&mock_writer, &expected_messages]() {
return mock_writer.get_messages().size() >= expected_messages;
});
auto recorded_messages = mock_writer.get_messages();
EXPECT_TRUE(ret) << "failed to capture expected messages in time";
EXPECT_THAT(recorded_messages, SizeIs(expected_messages));

auto recorded_topics = mock_writer.get_topics();
ASSERT_THAT(recorded_topics, SizeIs(1)) << "size=" << recorded_topics.size();
EXPECT_THAT(recorded_topics.at(string_topic).serialization_format, Eq("rmw_format"));
ASSERT_THAT(recorded_messages, SizeIs(expected_messages));
auto string_messages = filter_messages<test_msgs::msg::Strings>(
recorded_messages, string_topic);
ASSERT_THAT(string_messages, SizeIs(4));
EXPECT_THAT(string_messages[0]->string_value, Eq(string_message->string_value));
}

TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata)
{
auto string_message = get_messages_strings()[1];
Expand Down