Skip to content

Commit

Permalink
Add config option to use storage_id parameter in benchmark_launch.py (#…
Browse files Browse the repository at this point in the history
…1303) (#1318)

* Add config option to use storage_id parameter in benchmark_launch.py

- Added ability to run performance benchmarking with multiple
storage plugins

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
mergify[bot] authored May 24, 2023
1 parent 7835f5b commit 92e8aa5
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rosbag2_performance_benchmarking:
no_transport: False # Whether to run storage-only or end-to-end (including transport) benchmark
preserve_bags: False # Whether to leave bag files after experiment (and between runs). Some configurations can take lots of space!
parameters: # Each combination of parameters in this section will be benchmarked
storage_id: ["mcap", "sqlite3"]
max_cache_size: [100000000, 500000000]
max_bag_size: [0, 1000000000]
compression: [""]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rosbag2_performance_benchmarking:
no_transport: True # Whether to run storage-only or end-to-end (including transport) benchmark
preserve_bags: False # Whether to leave bag files after experiment (and between runs). Some configurations can take lots of space!
parameters: # Each combination of parameters in this section will be benchmarked
storage_id: ["mcap", "sqlite3"]
max_cache_size: [100000000, 500000000]
max_bag_size: [0, 1000000000]
compression: [""]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rosbag2_performance_benchmarking:
no_transport: False # Whether to run storage-only or end-to-end (including transport) benchmark
preserve_bags: False # Whether to leave bag files after experiment (and between runs). Some configurations can take lots of space!
parameters: # Each combination of parameters in this section will be benchmarked
storage_id: ["mcap", "sqlite3"]
max_cache_size: [100000000]
max_bag_size: [0]
compression: [""]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rosbag2_performance_benchmarking:
no_transport: True # Whether to run storage-only or end-to-end (including transport) benchmark
preserve_bags: False # Whether to leave bag files after experiment (and between runs). Some configurations can take lots of space!
parameters: # Each combination of parameters in this section will be benchmarked
storage_id: ["mcap", "sqlite3"]
max_cache_size: [100000000]
max_bag_size: [0]
compression: [""]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@
import time

from ament_index_python import get_package_share_directory

import launch

import launch_ros
from rosbag2_py import get_default_storage_id

import yaml

Expand Down Expand Up @@ -197,24 +196,31 @@ def _producer_node_exited(event, context):

# Handle clearing bag files
if not node_params['preserve_bags']:
bag_files = pathlib.Path.cwd().joinpath(node_params['bag_folder']).glob('*.db3')
stats_path = pathlib.Path.cwd().joinpath(node_params['bag_folder'], 'bagfiles_info.yaml')
stats = {
'total_size': 0,
'bagfiles': []
}

# Delete rosbag files
for f in bag_files:
filesize = f.stat().st_size
f.unlink()
stats['bagfiles'].append({f.name: {'size': filesize}})
stats['total_size'] += filesize

# Dump files size information
with open(stats_path, 'w') as stats_file:
yaml.dump(stats, stats_file)
storage_id = get_default_storage_id()
if node_params['storage_id'] != '':
storage_id = node_params['storage_id']
if storage_id == 'sqlite3' or storage_id == 'mcap':
file_ext_mask = '*.mcap' if storage_id == 'mcap' else '*.db3'
bag_files = pathlib.Path.cwd().joinpath(node_params['bag_folder']).glob(file_ext_mask)
stats_path = pathlib.Path.cwd().joinpath(node_params['bag_folder'],
'bagfiles_info.yaml')
stats = {
'total_size': 0,
'bagfiles': []
}

# Delete rosbag files
for f in bag_files:
filesize = f.stat().st_size
f.unlink()
stats['bagfiles'].append({f.name: {'size': filesize}})
stats['total_size'] += filesize

# Dump files size information
with open(stats_path, 'w') as stats_file:
yaml.dump(stats, stats_file)
else:
print(f"Can't delete bag files. Unsupported storage_id = {storage_id}")
# If we have non empty rosbag PID, then we need to kill it (end-to-end transport case)
if _rosbag_pid is not None and transport:
os.kill(_rosbag_pid, signal.SIGINT)
Expand Down Expand Up @@ -270,6 +276,7 @@ def generate_launch_description():
# Producers options
producers_params = bench_cfg['benchmark']['parameters']

storage_id = producers_params.get('storage_id', [''])
max_cache_size_params = producers_params.get('max_cache_size')
max_bag_size_params = producers_params.get('max_bag_size')
compression_params = producers_params.get('compression')
Expand Down Expand Up @@ -298,7 +305,8 @@ def __generate_cross_section_parameter(i,
compression_queue_size,
compression_threads,
storage_config,
max_bag_size):
max_bag_size,
storage):
# Storage conf parameter for each producer
st_conf_filename = storage_config.replace('.yaml', '')
storage_conf_path = ''
Expand All @@ -315,8 +323,9 @@ def __generate_cross_section_parameter(i,

# Generates unique title for producer
node_title = 'run_' + \
'{i}_{cache}_{comp}_{comp_q}_{comp_t}_{st_conf}_{bag_size}'.format(
'{i}_{storage}_{cache}_{comp}_{comp_q}_{comp_t}_{st_conf}_{bag_size}'.format(
i=i,
storage=storage,
cache=cache,
comp=compression if compression else 'default_compression',
comp_q=compression_queue_size,
Expand Down Expand Up @@ -351,7 +360,8 @@ def __generate_cross_section_parameter(i,
'compression_threads': compression_threads,
'storage_config_file': str(storage_conf_path),
'config_file': str(_producers_cfg_path),
'max_bag_size': max_bag_size
'max_bag_size': max_bag_size,
'storage_id': str(storage)
}
)

Expand All @@ -364,14 +374,16 @@ def __generate_cross_section_parameter(i,
compression_queue_size,
compression_threads,
storage_config,
max_bag_size)
max_bag_size,
storage)
for i in range(0, repeat_each)
for cache in max_cache_size_params
for compression in compression_params
for compression_queue_size in compression_queue_size_params
for compression_threads in compression_threads_params
for storage_config in storage_config_file_params
for max_bag_size in max_bag_size_params
for storage in storage_id
]

ld = launch.LaunchDescription()
Expand All @@ -391,6 +403,8 @@ def __generate_cross_section_parameter(i,
{'compression_threads': producer_param['compression_threads']}
]

if producer_param['storage_id'] != '':
parameters.append({'storage_id': producer_param['storage_id']})
if producer_param['storage_config_file'] != '':
parameters.append({'storage_config_file': producer_param['storage_config_file']})
if producer_param['compression_format'] != '':
Expand All @@ -415,6 +429,11 @@ def __generate_cross_section_parameter(i,

# ROS2 bag process for recording messages
rosbag_args = []
if producer_param['storage_id']:
rosbag_args += [
'-s',
str(producer_param['storage_id'])
]
if producer_param['storage_config_file']:
rosbag_args += [
'--storage-config-file',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

<depend>rclcpp</depend>
<depend>rosbag2_compression</depend>
<depend>rosbag2_py</depend>
<depend>rosbag2_cpp</depend>
<depend>rosbag2_storage</depend>
<depend>rmw</depend>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ void write_benchmark_results(
}

if (new_file) {
output_file << "storage_id ";
output_file << "instances frequency message_size total_messages_sent cache_size ";
output_file << "max_bagfile_size storage_config ";
output_file << "compression compression_queue compression_threads ";
Expand All @@ -80,6 +81,7 @@ void write_benchmark_results(
int total_recorded_count = get_message_count_from_metadata(bag_config.storage_options.uri);

for (const auto & c : publisher_groups_config) {
output_file << bag_config.storage_options.storage_id << " ";
output_file << c.count << " ";
output_file << c.producer_config.frequency << " ";
output_file << c.producer_config.message_size << " ";
Expand Down

0 comments on commit 92e8aa5

Please sign in to comment.