Skip to content

Commit

Permalink
SequentialWriter to cache by message size instead of message count (r…
Browse files Browse the repository at this point in the history
…os2#530)

* Fixes ros2#464 - caches by message size and not message count

Signed-off-by: Jaison Titus <jaisontj@amazon.com>
  • Loading branch information
jaisontj authored Oct 9, 2020
1 parent 3dc568b commit 8a917af
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
std::chrono::seconds max_bagfile_duration;

// Intermediate cache to write multiple messages into the storage.
// `max_cache_size` is the amount of messages to hold in storage before writing to disk.
// `max_cache_size` is the number of bytes of messages to hold in storage
// before writing to disk.
uint64_t max_cache_size_;
uint64_t current_cache_size_;
std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> cache_;

// Used to track topic -> message count
Expand Down
6 changes: 4 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ void SequentialWriter::open(
max_bagfile_size_ = storage_options.max_bagfile_size;
max_bagfile_duration = std::chrono::seconds(storage_options.max_bagfile_duration);
max_cache_size_ = storage_options.max_cache_size;

cache_.reserve(max_cache_size_);
current_cache_size_ = 0u;

if (converter_options.output_serialization_format !=
converter_options.input_serialization_format)
Expand Down Expand Up @@ -249,11 +249,13 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
storage_->write(converted_msg);
} else {
cache_.push_back(converted_msg);
if (cache_.size() >= max_cache_size_) {
current_cache_size_ += converted_msg->serialized_data->buffer_length;
if (current_cache_size_ >= max_cache_size_) {
storage_->write(cache_);
// reset cache
cache_.clear();
cache_.reserve(max_cache_size_);
current_cache_size_ = 0u;
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "rosbag2_cpp/writer.hpp"

#include "rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/ros_helper.hpp"
#include "rosbag2_storage/topic_metadata.hpp"

#include "mock_converter.hpp"
Expand Down Expand Up @@ -265,13 +266,14 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf
}

TEST_F(SequentialWriterTest, only_write_after_cache_is_full) {
const size_t counter = 1000;
const uint64_t counter = 1000;
const uint64_t max_cache_size = 100;

std::string msg_content = "Hello";
const auto msg_length = msg_content.length();
EXPECT_CALL(
*storage_,
write(An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())).
Times(counter / max_cache_size);
Times(static_cast<int>(counter * msg_length / max_cache_size));
EXPECT_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).Times(0);
Expand All @@ -284,6 +286,8 @@ TEST_F(SequentialWriterTest, only_write_after_cache_is_full) {

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data = rosbag2_storage::make_serialized_message(
msg_content.c_str(), msg_length);

storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = max_cache_size;
Expand Down Expand Up @@ -314,8 +318,13 @@ TEST_F(SequentialWriterTest, do_not_use_cache_if_cache_size_is_zero) {

std::string rmw_format = "rmw_format";

std::string msg_content = "Hello";
auto msg_length = msg_content.length();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data = rosbag2_storage::make_serialized_message(
msg_content.c_str(), msg_length);


storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = max_cache_size;
Expand Down

0 comments on commit 8a917af

Please sign in to comment.