Skip to content

Commit

Permalink
test(storage): benchmark supports ranged reads (#9378)
Browse files Browse the repository at this point in the history
A lot of analytical workloads use "ranged" reads, i.e., they read a
portion of a larger object. There seems to be performance advantages
when reading up to 2MiB. Likewise, there are measurable performance
differences when the offset is on a 2MiB boundary and/or the read
size is on a 2 MiB boundary.

We would like to measure these effects, and therefore we need new
parameters for the benchmark.

By now, we have 4 of these "quantized" size ranges, I did some
refactoring to avoid code duplication.
  • Loading branch information
coryan authored Jun 29, 2022
1 parent 9effcb2 commit 55f0da0
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 108 deletions.
157 changes: 99 additions & 58 deletions google/cloud/storage/benchmarks/storage_throughput_vs_cpu_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "google/cloud/internal/getenv.h"
#include "google/cloud/internal/random.h"
#include "google/cloud/log.h"
#include "absl/time/time.h"
#include <functional>
#include <future>
#include <set>
Expand Down Expand Up @@ -149,52 +150,66 @@ int main(int argc, char* argv[]) {
}
};

auto output_size_range = [](std::string const& name, auto minimum,
auto maximum) {
std::cout << "\n# " << name << " Range: [" << minimum << ',' << maximum
<< ']';
};

auto output_quantized_range = [](std::string const& name, auto minimum,
auto maximum, auto quantum) {
std::cout << "\n# " << name << " Range: [" << minimum << ',' << maximum
<< "]\n# " << name << " Quantum: " << quantum;
};

std::cout << "# Running test on bucket: " << bucket_name << "\n# Start time: "
<< google::cloud::internal::FormatRfc3339(
std::chrono::system_clock::now())
<< "\n# Region: " << options->region
<< "\n# Duration: " << options->duration.count() << "s"
<< "\n# Region: " << options->region << "\n# Duration: "
<< absl::FormatDuration(absl::FromChrono(options->duration))
<< "\n# Thread Count: " << options->thread_count
<< "\n# Client Per Thread: " << options->client_per_thread
<< "\n# gRPC Channel Count: " << options->grpc_channel_count
<< "\n# DirectPath Channel Count: "
<< options->direct_path_channel_count << "\n# Object Size Range: ["
<< options->minimum_object_size << ","
<< options->maximum_object_size << "]\n# Write Buffer Size Range: ["
<< options->minimum_write_buffer_size << ","
<< options->maximum_write_buffer_size
<< "]\n# Write Buffer Quantum: " << options->write_buffer_quantum
<< "\n# Read Buffer Size Range: ["
<< options->minimum_read_buffer_size << ","
<< options->maximum_read_buffer_size
<< "]\n# Read Buffer Quantum: " << options->read_buffer_quantum
<< "\n# Object Buffer Size Range (MiB): ["
<< options->minimum_object_size / gcs_bm::kMiB << ","
<< options->maximum_object_size / gcs_bm::kMiB
<< "]\n# Write Buffer Size Range (KiB): ["
<< options->minimum_write_buffer_size / gcs_bm::kKiB << ","
<< options->maximum_write_buffer_size / gcs_bm::kKiB
<< "]\n# Write Buffer Quantum (KiB): "
<< options->write_buffer_quantum / gcs_bm::kKiB
<< "\n# Read Buffer Size Range (KiB): ["
<< options->minimum_read_buffer_size / gcs_bm::kKiB << ","
<< options->maximum_read_buffer_size / gcs_bm::kKiB
<< "]\n# Read Buffer Quantum (KiB): "
<< options->read_buffer_quantum / gcs_bm::kKiB
<< "\n# Minimum Sample Count: " << options->minimum_sample_count
<< "\n# Maximum Sample Count: " << options->maximum_sample_count
<< "\n# Enabled Libs: "
<< absl::StrJoin(options->libs, ",", Formatter{})
<< "\n# Enabled Transports: "
<< absl::StrJoin(options->transports, ",", Formatter{})
<< "\n# Enabled CRC32C: "
<< absl::StrJoin(options->enabled_crc32c, ",", Formatter{})
<< "\n# Enabled MD5: "
<< absl::StrJoin(options->enabled_md5, ",", Formatter{})
<< "\n# REST Endpoint: " << options->rest_endpoint
<< "\n# Grpc Endpoint: " << options->grpc_endpoint
<< "\n# Direct Path Endpoint: " << options->direct_path_endpoint
<< "\n# Build info: " << notes << "\n";
<< options->direct_path_channel_count;

output_size_range("Object Size", options->minimum_object_size,
options->maximum_object_size);
output_quantized_range(
"Write Buffer Size", options->minimum_write_buffer_size,
options->maximum_write_buffer_size, options->write_buffer_quantum);
output_quantized_range("Read Buffer Size", options->minimum_read_buffer_size,
options->maximum_read_buffer_size,
options->read_buffer_quantum);

std::cout
<< "\n# Minimum Sample Count: " << options->minimum_sample_count
<< "\n# Maximum Sample Count: " << options->maximum_sample_count
<< "\n# Enabled Libs: " << absl::StrJoin(options->libs, ",", Formatter{})
<< "\n# Enabled Transports: "
<< absl::StrJoin(options->transports, ",", Formatter{})
<< "\n# Enabled CRC32C: "
<< absl::StrJoin(options->enabled_crc32c, ",", Formatter{})
<< "\n# Enabled MD5: "
<< absl::StrJoin(options->enabled_md5, ",", Formatter{})
<< "\n# REST Endpoint: " << options->rest_endpoint
<< "\n# Grpc Endpoint: " << options->grpc_endpoint
<< "\n# Direct Path Endpoint: " << options->direct_path_endpoint
<< "\n# Transfer Stall Timeout: "
<< absl::FormatDuration(absl::FromChrono(options->transfer_stall_timeout))
<< "\n# Download Stall Timeout: "
<< absl::FormatDuration(absl::FromChrono(options->download_stall_timeout))
<< "\n# Minimum Sample Delay: "
<< absl::FormatDuration(absl::FromChrono(options->minimum_sample_delay));

output_quantized_range("Read Offset", options->minimum_read_offset,
options->maximum_read_offset,
options->read_offset_quantum);
output_quantized_range("Read Size", options->minimum_read_size,
options->maximum_read_size,
options->read_size_quantum);

std::cout << "\n# Build info: " << notes << "\n";
// Make the output generated so far immediately visible, helps with debugging.
std::cout << std::flush;

Expand Down Expand Up @@ -331,18 +346,45 @@ void RunThread(ThroughputOptions const& options, std::string const& bucket_name,

std::uniform_int_distribution<std::int64_t> size_generator(
options.minimum_object_size, options.maximum_object_size);
std::uniform_int_distribution<std::size_t> write_buffer_size_generator(
options.minimum_write_buffer_size / options.write_buffer_quantum,
options.maximum_write_buffer_size / options.write_buffer_quantum);
std::uniform_int_distribution<std::size_t> read_buffer_size_generator(
options.minimum_read_buffer_size / options.read_buffer_quantum,
options.maximum_read_buffer_size / options.read_buffer_quantum);

auto quantized_range_generator = [](auto minimum, auto maximum,
auto quantum) {
auto distribution = std::uniform_int_distribution<decltype(quantum)>(
minimum / quantum, maximum / quantum);
return [d = std::move(distribution), quantum](auto& g) mutable {
return quantum * d(g);
};
};

auto write_buffer_size_generator = quantized_range_generator(
options.minimum_write_buffer_size, options.maximum_write_buffer_size,
options.write_buffer_quantum);
auto read_buffer_size_generator = quantized_range_generator(
options.minimum_read_buffer_size, options.maximum_read_buffer_size,
options.read_buffer_quantum);
auto read_offset_generator = quantized_range_generator(
options.minimum_read_offset, options.maximum_read_offset,
options.read_offset_quantum);
auto read_size_generator = quantized_range_generator(
options.minimum_read_size, options.maximum_read_size,
options.read_size_quantum);

auto const read_range_enabled =
options.minimum_read_size != options.maximum_read_size;
auto read_range_generator = [&](auto& g, std::int64_t object_size)
-> absl::optional<std::pair<std::int64_t, std::int64_t>> {
if (!read_range_enabled || !std::bernoulli_distribution{}(g)) {
return absl::nullopt;
}
auto offset = (std::min)(object_size, read_offset_generator(g));
auto size = (std::min)(object_size - offset, read_size_generator(g));
return std::make_pair(offset, size);
};

std::uniform_int_distribution<std::size_t> crc32c_generator(
0, options.enabled_crc32c.size() - 1);
std::uniform_int_distribution<std::size_t> md5_generator(
0, options.enabled_crc32c.size() - 1);
std::bernoulli_distribution use_insert;

auto deadline = std::chrono::steady_clock::now() + options.duration;

Expand All @@ -351,21 +393,20 @@ void RunThread(ThroughputOptions const& options, std::string const& bucket_name,
iteration_count < options.maximum_sample_count &&
(iteration_count < options.minimum_sample_count || start < deadline);
start = std::chrono::steady_clock::now(), ++iteration_count) {
auto object_name = gcs_bm::MakeRandomObjectName(generator);
auto object_size = size_generator(generator);
auto write_buffer_size =
options.write_buffer_quantum * write_buffer_size_generator(generator);
auto read_buffer_size =
options.read_buffer_quantum * read_buffer_size_generator(generator);
auto const object_name = gcs_bm::MakeRandomObjectName(generator);
auto const object_size = size_generator(generator);
auto const write_buffer_size = write_buffer_size_generator(generator);
auto const read_buffer_size = read_buffer_size_generator(generator);
bool const enable_crc = options.enabled_crc32c[crc32c_generator(generator)];
bool const enable_md5 = options.enabled_md5[md5_generator(generator)];
auto const range = read_range_generator(generator, object_size);

auto& uploader = uploaders[uploader_generator(generator)];
auto upload_result =
uploader->Run(bucket_name, object_name,
gcs_bm::ThroughputExperimentConfig{
gcs_bm::kOpWrite, object_size, write_buffer_size,
enable_crc, enable_md5});
auto upload_result = uploader->Run(
bucket_name, object_name,
gcs_bm::ThroughputExperimentConfig{
gcs_bm::kOpWrite, object_size, write_buffer_size, enable_crc,
enable_md5, /*read_range=*/absl::nullopt});
auto status = upload_result.status;
handler(std::move(upload_result));

Expand All @@ -376,7 +417,7 @@ void RunThread(ThroughputOptions const& options, std::string const& bucket_name,
handler(downloader->Run(
bucket_name, object_name,
gcs_bm::ThroughputExperimentConfig{op, object_size, read_buffer_size,
enable_crc, enable_md5}));
enable_crc, enable_md5, range}));
}
auto client = provider(ExperimentTransport::kJson);
(void)client.DeleteObject(bucket_name, object_name);
Expand Down
12 changes: 11 additions & 1 deletion google/cloud/storage/benchmarks/throughput_experiment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class UploadObject : public ThroughputExperiment {
kOpInsert,
start,
config.object_size,
/*transfer_offset=*/0,
config.object_size,
config.app_buffer_size,
config.enable_crc32c,
Expand Down Expand Up @@ -121,6 +122,7 @@ class UploadObject : public ThroughputExperiment {
kOpWrite,
start,
config.object_size,
/*transfer_offset=*/0,
config.object_size,
config.app_buffer_size,
config.enable_crc32c,
Expand Down Expand Up @@ -163,8 +165,13 @@ class DownloadObject : public ThroughputExperiment {

auto const start = std::chrono::system_clock::now();
auto timer = Timer::PerThread();
auto const offset = config.read_range.value_or(std::make_pair(0, 0)).first;
auto read_range =
config.read_range.has_value()
? gcs::ReadRange(offset, offset + config.read_range->second)
: gcs::ReadRange();
auto reader = client_.ReadObject(
bucket_name, object_name,
bucket_name, object_name, read_range,
gcs::DisableCrc32cChecksum(!config.enable_crc32c),
gcs::DisableMD5Hash(!config.enable_md5), api_selector);
std::int64_t transfer_size = 0;
Expand All @@ -180,6 +187,7 @@ class DownloadObject : public ThroughputExperiment {
config.op,
start,
config.object_size,
offset,
transfer_size,
config.app_buffer_size,
config.enable_crc32c,
Expand Down Expand Up @@ -274,6 +282,7 @@ class DownloadObjectLibcurl : public ThroughputExperiment {
config.op,
start,
config.object_size,
/*transfer_offset=*/0,
config.object_size,
config.app_buffer_size,
config.enable_crc32c,
Expand Down Expand Up @@ -344,6 +353,7 @@ class DownloadObjectRawGrpc : public ThroughputExperiment {
config.op,
start,
config.object_size,
/*transfer_offset=*/0,
bytes_received,
config.app_buffer_size,
/*crc_enabled=*/false,
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/storage/benchmarks/throughput_experiment.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
#include "google/cloud/storage/benchmarks/benchmark_utils.h"
#include "google/cloud/storage/benchmarks/throughput_options.h"
#include "google/cloud/storage/benchmarks/throughput_result.h"
#include "absl/types/optional.h"
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>

namespace google {
Expand All @@ -33,6 +36,7 @@ struct ThroughputExperimentConfig {
std::size_t app_buffer_size;
bool enable_crc32c;
bool enable_md5;
absl::optional<std::pair<std::int64_t, std::int64_t>> read_range;
};

/**
Expand Down
14 changes: 9 additions & 5 deletions google/cloud/storage/benchmarks/throughput_experiment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ TEST_P(ThroughputExperimentIntegrationTest, Upload) {
auto experiments = CreateUploadExperiments(options, provider);
for (auto& e : experiments) {
auto object_name = MakeRandomObjectName();
ThroughputExperimentConfig config{OpType::kOpInsert, 16 * kKiB, 1 * kMiB,
/*enable_crc32c=*/false,
/*enable_md5=*/false};
ThroughputExperimentConfig config{
OpType::kOpInsert, 16 * kKiB, 1 * kMiB,
/*enable_crc32c=*/false,
/*enable_md5=*/false, absl::nullopt};
auto result = e->Run(bucket_name_, object_name, config);
ASSERT_STATUS_OK(result.status);
auto status = client->DeleteObject(bucket_name_, object_name);
Expand All @@ -95,9 +96,12 @@ TEST_P(ThroughputExperimentIntegrationTest, Download) {
auto object_name = MakeRandomObjectName();

auto constexpr kObjectSize = 16 * kKiB;
ThroughputExperimentConfig config{OpType::kOpRead0, kObjectSize, 1 * kMiB,
ThroughputExperimentConfig config{OpType::kOpRead0,
kObjectSize,
1 * kMiB,
/*enable_crc32c=*/false,
/*enable_md5=*/false};
/*enable_md5=*/false,
std::make_pair(128 * kKiB, 256 * kKiB)};

auto contents = MakeRandomData(kObjectSize);
auto insert =
Expand Down
Loading

0 comments on commit 55f0da0

Please sign in to comment.