Skip to content

Commit

Permalink
feat(spanner): control replicas/regions used in non-transactional rea…
Browse files Browse the repository at this point in the history
…ds (#13031)

Add the `DirectedReadOption` to indicate which replicas or regions
should be used for `Client::Read()`, `Client::ExecuteQuery()`, and
`Client::ProfileQuery()` calls in read-only or single-use transactions.

- The `IncludeReplicas` variant lists the replicas to try (in order) to
  process the request, and what to do if the list is exhausted without
  finding a healthy replica.

- The `ExcludeReplicas` variant lists replicas that should be excluded
  from serving the request.
  • Loading branch information
devbww authored Nov 3, 2023
1 parent dbe26c9 commit 0cfd158
Show file tree
Hide file tree
Showing 14 changed files with 565 additions and 108 deletions.
1 change: 1 addition & 0 deletions google/cloud/spanner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ add_library(
database_admin_connection.cc
database_admin_connection.h
date.h
directed_read_replicas.h
encryption_config.h
iam_updater.h
instance.cc
Expand Down
127 changes: 81 additions & 46 deletions google/cloud/spanner/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,49 +35,67 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

namespace {

// Extracts an option value as an `absl::optional`.
// Returns an option value as an `absl::optional`. If `OptionType` is
// not present in `opts`, the returned optional is empty (disengaged).
template <typename OptionType>
absl::optional<typename OptionType::Type> OptOpt(Options const& opts) {
absl::optional<typename OptionType::Type> optopt;
if (opts.has<OptionType>()) optopt = opts.get<OptionType>();
return optopt;
}

// Extracts (removes and returns) an option value from `opts`. If
// `OptionType` is not present, returns a default-constructed value.
template <typename OptionType>
typename OptionType::Type ExtractOpt(Options& opts) {
auto option = internal::ExtractOption<OptionType>(opts);
return option ? *std::move(option) : typename OptionType::Type();
}

} // namespace

RowStream Client::Read(std::string table, KeySet keys,
std::vector<std::string> columns, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->Read({spanner_internal::MakeSingleUseTransaction(
Transaction::ReadOnlyOptions()),
std::move(table), std::move(keys), std::move(columns),
ToReadOptions(internal::CurrentOptions()),
absl::nullopt});
ToReadOptions(internal::CurrentOptions()), absl::nullopt,
false, std::move(directed_read_option)});
}

RowStream Client::Read(Transaction::SingleUseOptions transaction_options,
std::string table, KeySet keys,
std::vector<std::string> columns, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->Read({spanner_internal::MakeSingleUseTransaction(
std::move(transaction_options)),
std::move(table), std::move(keys), std::move(columns),
ToReadOptions(internal::CurrentOptions()),
absl::nullopt});
ToReadOptions(internal::CurrentOptions()), absl::nullopt,
false, std::move(directed_read_option)});
}

RowStream Client::Read(Transaction transaction, std::string table, KeySet keys,
std::vector<std::string> columns, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->Read({std::move(transaction), std::move(table), std::move(keys),
std::move(columns),
ToReadOptions(internal::CurrentOptions()),
absl::nullopt});
ToReadOptions(internal::CurrentOptions()), absl::nullopt,
false, std::move(directed_read_option)});
}

RowStream Client::Read(ReadPartition const& read_partition, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->Read(spanner_internal::MakeReadParams(read_partition));
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->Read(spanner_internal::MakeReadParams(
read_partition, std::move(directed_read_option)));
}

StatusOr<std::vector<ReadPartition>> Client::PartitionRead(
Expand All @@ -87,70 +105,87 @@ StatusOr<std::vector<ReadPartition>> Client::PartitionRead(
return conn_->PartitionRead(
{{std::move(transaction), std::move(table), std::move(keys),
std::move(columns), ToReadOptions(internal::CurrentOptions()),
absl::nullopt},
absl::nullopt, false, DirectedReadOption::Type{}},
ToPartitionOptions(internal::CurrentOptions())});
}

RowStream Client::ExecuteQuery(SqlStatement statement, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->ExecuteQuery({spanner_internal::MakeSingleUseTransaction(
Transaction::ReadOnlyOptions()),
std::move(statement),
QueryOptions(internal::CurrentOptions()),
absl::nullopt});
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->ExecuteQuery(
{spanner_internal::MakeSingleUseTransaction(
Transaction::ReadOnlyOptions()),
std::move(statement), QueryOptions(internal::CurrentOptions()),
absl::nullopt, false, std::move(directed_read_option)});
}

RowStream Client::ExecuteQuery(
Transaction::SingleUseOptions transaction_options, SqlStatement statement,
Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->ExecuteQuery({spanner_internal::MakeSingleUseTransaction(
std::move(transaction_options)),
std::move(statement),
QueryOptions(internal::CurrentOptions()),
absl::nullopt});
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->ExecuteQuery(
{spanner_internal::MakeSingleUseTransaction(
std::move(transaction_options)),
std::move(statement), QueryOptions(internal::CurrentOptions()),
absl::nullopt, false, std::move(directed_read_option)});
}

RowStream Client::ExecuteQuery(Transaction transaction, SqlStatement statement,
Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->ExecuteQuery({std::move(transaction), std::move(statement),
QueryOptions(internal::CurrentOptions()),
absl::nullopt});
absl::nullopt, false,
std::move(directed_read_option)});
}

RowStream Client::ExecuteQuery(QueryPartition const& partition, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->ExecuteQuery(spanner_internal::MakeSqlParams(
partition, QueryOptions(internal::CurrentOptions())));
partition, QueryOptions(internal::CurrentOptions()),
std::move(directed_read_option)));
}

ProfileQueryResult Client::ProfileQuery(SqlStatement statement, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->ProfileQuery({spanner_internal::MakeSingleUseTransaction(
Transaction::ReadOnlyOptions()),
std::move(statement),
QueryOptions(internal::CurrentOptions()),
absl::nullopt});
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->ProfileQuery(
{spanner_internal::MakeSingleUseTransaction(
Transaction::ReadOnlyOptions()),
std::move(statement), QueryOptions(internal::CurrentOptions()),
absl::nullopt, false, std::move(directed_read_option)});
}

ProfileQueryResult Client::ProfileQuery(
Transaction::SingleUseOptions transaction_options, SqlStatement statement,
Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->ProfileQuery({spanner_internal::MakeSingleUseTransaction(
std::move(transaction_options)),
std::move(statement),
QueryOptions(internal::CurrentOptions()),
absl::nullopt});
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->ProfileQuery(
{spanner_internal::MakeSingleUseTransaction(
std::move(transaction_options)),
std::move(statement), QueryOptions(internal::CurrentOptions()),
absl::nullopt, false, std::move(directed_read_option)});
}

ProfileQueryResult Client::ProfileQuery(Transaction transaction,
SqlStatement statement, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
opts = internal::MergeOptions(std::move(opts), opts_);
auto directed_read_option = ExtractOpt<DirectedReadOption>(opts);
internal::OptionsSpan span(std::move(opts));
return conn_->ProfileQuery({std::move(transaction), std::move(statement),
QueryOptions(internal::CurrentOptions()),
absl::nullopt});
absl::nullopt, false,
std::move(directed_read_option)});
}

StatusOr<std::vector<QueryPartition>> Client::PartitionQuery(
Expand All @@ -166,7 +201,7 @@ StatusOr<DmlResult> Client::ExecuteDml(Transaction transaction,
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->ExecuteDml({std::move(transaction), std::move(statement),
QueryOptions(internal::CurrentOptions()),
absl::nullopt});
absl::nullopt, false, DirectedReadOption::Type{}});
}

StatusOr<ProfileDmlResult> Client::ProfileDml(Transaction transaction,
Expand All @@ -175,7 +210,7 @@ StatusOr<ProfileDmlResult> Client::ProfileDml(Transaction transaction,
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->ProfileDml({std::move(transaction), std::move(statement),
QueryOptions(internal::CurrentOptions()),
absl::nullopt});
absl::nullopt, false, DirectedReadOption::Type{}});
}

StatusOr<ExecutionPlan> Client::AnalyzeSql(Transaction transaction,
Expand All @@ -184,7 +219,7 @@ StatusOr<ExecutionPlan> Client::AnalyzeSql(Transaction transaction,
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->AnalyzeSql({std::move(transaction), std::move(statement),
QueryOptions(internal::CurrentOptions()),
absl::nullopt});
absl::nullopt, false, DirectedReadOption::Type{}});
}

StatusOr<BatchDmlResult> Client::ExecuteBatchDml(
Expand Down
Loading

0 comments on commit 0cfd158

Please sign in to comment.