diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 1bece470bd..8617e71385 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -118,8 +118,10 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter std::chrono::seconds max_bagfile_duration; // Intermediate cache to write multiple messages into the storage. - // `max_cache_size` is the amount of messages to hold in storage before writing to disk. + // `max_cache_size` is the number of bytes of messages to hold in storage + // before writing to disk. uint64_t max_cache_size_; + uint64_t current_cache_size_; std::vector> cache_; // Used to track topic -> message count diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index d8b17cb7cc..15d5e9a41e 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -89,8 +89,8 @@ void SequentialWriter::open( max_bagfile_size_ = storage_options.max_bagfile_size; max_bagfile_duration = std::chrono::seconds(storage_options.max_bagfile_duration); max_cache_size_ = storage_options.max_cache_size; - cache_.reserve(max_cache_size_); + current_cache_size_ = 0u; if (converter_options.output_serialization_format != converter_options.input_serialization_format) @@ -249,11 +249,13 @@ void SequentialWriter::write(std::shared_ptrwrite(converted_msg); } else { cache_.push_back(converted_msg); - if (cache_.size() >= max_cache_size_) { + current_cache_size_ += converted_msg->serialized_data->buffer_length; + if (current_cache_size_ >= max_cache_size_) { storage_->write(cache_); // reset cache cache_.clear(); cache_.reserve(max_cache_size_); + current_cache_size_ = 0u; } } } diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index 2553b4b135..fce161781d 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -25,6 +25,7 @@ #include "rosbag2_cpp/writer.hpp" #include "rosbag2_storage/bag_metadata.hpp" +#include "rosbag2_storage/ros_helper.hpp" #include "rosbag2_storage/topic_metadata.hpp" #include "mock_converter.hpp" @@ -265,13 +266,14 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf } TEST_F(SequentialWriterTest, only_write_after_cache_is_full) { - const size_t counter = 1000; + const uint64_t counter = 1000; const uint64_t max_cache_size = 100; - + std::string msg_content = "Hello"; + const auto msg_length = msg_content.length(); EXPECT_CALL( *storage_, write(An> &>())). - Times(counter / max_cache_size); + Times(static_cast(counter * msg_length / max_cache_size)); EXPECT_CALL( *storage_, write(An>())).Times(0); @@ -284,6 +286,8 @@ TEST_F(SequentialWriterTest, only_write_after_cache_is_full) { auto message = std::make_shared(); message->topic_name = "test_topic"; + message->serialized_data = rosbag2_storage::make_serialized_message( + msg_content.c_str(), msg_length); storage_options_.max_bagfile_size = 0; storage_options_.max_cache_size = max_cache_size; @@ -314,8 +318,13 @@ TEST_F(SequentialWriterTest, do_not_use_cache_if_cache_size_is_zero) { std::string rmw_format = "rmw_format"; + std::string msg_content = "Hello"; + auto msg_length = msg_content.length(); auto message = std::make_shared(); message->topic_name = "test_topic"; + message->serialized_data = rosbag2_storage::make_serialized_message( + msg_content.c_str(), msg_length); + storage_options_.max_bagfile_size = 0; storage_options_.max_cache_size = max_cache_size;