Skip to content

Commit

Permalink
feat(spanner): add the final pieces for the RouteToLeaderOption (#11112)
Browse files Browse the repository at this point in the history
The final glue to add a routing header to Spanner RPCs that should be
served in the leader region.

For the time being, the boolean option defaults to off (set as false).  See
#11111.

`${GOOGLE_CLOUD_CPP_SPANNER_ROUTE_TO_LEADER}` overrides any
code setting for the `RouteToLeaderOption`.
  • Loading branch information
devbww authored Mar 28, 2023
1 parent 5bd8807 commit bedd265
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "google/cloud/spanner/benchmarks/benchmarks_config.h"
#include "google/cloud/spanner/client.h"
#include "google/cloud/spanner/internal/defaults.h"
#include "google/cloud/spanner/internal/route_to_leader.h"
#include "google/cloud/spanner/internal/session_pool.h"
#include "google/cloud/spanner/internal/spanner_stub.h"
#include "google/cloud/spanner/testing/pick_random_instance.h"
Expand Down Expand Up @@ -539,6 +540,7 @@ class ReadExperiment : public BasicExperiment<Traits> {
Status last_status;
for (int i = 0; i != 10; ++i) {
grpc::ClientContext context;
spanner_internal::RouteToLeader(context); // always for CreateSession
google::spanner::v1::CreateSessionRequest request{};
request.set_database(database.FullName());
auto response = stub->CreateSession(context, request);
Expand Down Expand Up @@ -678,6 +680,7 @@ class SelectExperiment : public BasicExperiment<Traits> {
Status last_status;
for (int i = 0; i != ExperimentImpl<Traits>::kColumnCount; ++i) {
grpc::ClientContext context;
spanner_internal::RouteToLeader(context); // always for CreateSession
google::spanner::v1::CreateSessionRequest request{};
request.set_database(database.FullName());
auto response = stub->CreateSession(context, request);
Expand Down Expand Up @@ -837,6 +840,7 @@ class UpdateExperiment : public BasicExperiment<Traits> {
Status last_status;
for (int i = 0; i != 10; ++i) {
grpc::ClientContext context;
spanner_internal::RouteToLeader(context); // always for CreateSession
google::spanner::v1::CreateSessionRequest request{};
request.set_database(database.FullName());
auto response = stub->CreateSession(context, request);
Expand Down Expand Up @@ -1022,6 +1026,7 @@ class MutationExperiment : public BasicExperiment<Traits> {
Status last_status;
for (int i = 0; i != 10; ++i) {
grpc::ClientContext context;
spanner_internal::RouteToLeader(context); // always for CreateSession
google::spanner::v1::CreateSessionRequest request{};
request.set_database(database.FullName());
auto response = stub->CreateSession(context, request);
Expand Down
31 changes: 23 additions & 8 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "google/cloud/spanner/internal/logging_result_set_reader.h"
#include "google/cloud/spanner/internal/partial_result_set_resume.h"
#include "google/cloud/spanner/internal/partial_result_set_source.h"
#include "google/cloud/spanner/internal/route_to_leader.h"
#include "google/cloud/spanner/internal/status_utils.h"
#include "google/cloud/spanner/options.h"
#include "google/cloud/spanner/query_partition.h"
Expand Down Expand Up @@ -387,8 +388,10 @@ StatusOr<google::spanner::v1::Transaction> ConnectionImpl::BeginTransaction(
auto response = RetryLoop(
RetryPolicyPrototype()->clone(), BackoffPolicyPrototype()->clone(),
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
google::spanner::v1::BeginTransactionRequest const& request) {
[&stub, route_to_leader = ctx.route_to_leader](
grpc::ClientContext& context,
google::spanner::v1::BeginTransactionRequest const& request) {
if (route_to_leader) RouteToLeader(context);
return stub->BeginTransaction(context, request);
},
begin, func);
Expand Down Expand Up @@ -442,11 +445,13 @@ spanner::RowStream ConnectionImpl::ReadImpl(
auto stub = session_pool_->GetStub(*session);
auto const tracing_enabled = RpcStreamTracingEnabled();
auto const& tracing_options = RpcTracingOptions();
auto factory = [stub, request, tracing_enabled,
auto factory = [stub, request, route_to_leader = ctx.route_to_leader,
tracing_enabled,
tracing_options](std::string const& resume_token) mutable {
if (!resume_token.empty()) request->set_resume_token(resume_token);
auto context = std::make_shared<grpc::ClientContext>();
internal::ConfigureContext(*context, internal::CurrentOptions());
if (route_to_leader) RouteToLeader(*context);
auto grpc_reader = stub->StreamingRead(*context, *request);
std::unique_ptr<PartialResultSetReader> reader =
std::make_unique<DefaultPartialResultSetReader>(std::move(context),
Expand Down Expand Up @@ -526,6 +531,7 @@ StatusOr<std::vector<spanner::ReadPartition>> ConnectionImpl::PartitionReadImpl(
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
google::spanner::v1::PartitionReadRequest const& request) {
RouteToLeader(context); // always for PartitionRead()
return stub->PartitionRead(context, request);
},
request, __func__);
Expand Down Expand Up @@ -668,14 +674,16 @@ ResultType ConnectionImpl::CommonQueryImpl(
auto const tracing_enabled = RpcStreamTracingEnabled();
auto const& tracing_options = RpcTracingOptions();
auto retry_resume_fn =
[stub, retry_policy_prototype, backoff_policy_prototype, tracing_enabled,
[stub, retry_policy_prototype, backoff_policy_prototype,
route_to_leader = ctx.route_to_leader, tracing_enabled,
tracing_options](google::spanner::v1::ExecuteSqlRequest& request) mutable
-> StatusOr<std::unique_ptr<ResultSourceInterface>> {
auto factory = [stub, request, tracing_enabled,
auto factory = [stub, request, route_to_leader, tracing_enabled,
tracing_options](std::string const& resume_token) mutable {
if (!resume_token.empty()) request.set_resume_token(resume_token);
auto context = std::make_shared<grpc::ClientContext>();
internal::ConfigureContext(*context, internal::CurrentOptions());
if (route_to_leader) RouteToLeader(*context);
auto grpc_reader = stub->ExecuteStreamingSql(*context, request);
std::unique_ptr<PartialResultSetReader> reader =
std::make_unique<DefaultPartialResultSetReader>(
Expand Down Expand Up @@ -745,13 +753,16 @@ StatusOr<ResultType> ConnectionImpl::CommonDmlImpl(

auto retry_resume_fn =
[function_name, stub, retry_policy_prototype, backoff_policy_prototype,
session](google::spanner::v1::ExecuteSqlRequest& request) mutable
session, route_to_leader = ctx.route_to_leader](
google::spanner::v1::ExecuteSqlRequest& request) mutable
-> StatusOr<std::unique_ptr<ResultSourceInterface>> {
StatusOr<google::spanner::v1::ResultSet> response = RetryLoop(
retry_policy_prototype->clone(), backoff_policy_prototype->clone(),
Idempotency::kIdempotent,
[stub](grpc::ClientContext& context,
google::spanner::v1::ExecuteSqlRequest const& request) {
[stub, route_to_leader](
grpc::ClientContext& context,
google::spanner::v1::ExecuteSqlRequest const& request) {
if (route_to_leader) RouteToLeader(context);
return stub->ExecuteSql(context, request);
},
request, function_name);
Expand Down Expand Up @@ -830,6 +841,7 @@ ConnectionImpl::PartitionQueryImpl(
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
google::spanner::v1::PartitionQueryRequest const& request) {
RouteToLeader(context); // always for PartitionQuery()
return stub->PartitionQuery(context, request);
},
request, __func__);
Expand Down Expand Up @@ -904,6 +916,7 @@ StatusOr<spanner::BatchDmlResult> ConnectionImpl::ExecuteBatchDmlImpl(
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
google::spanner::v1::ExecuteBatchDmlRequest const& request) {
RouteToLeader(context); // always for ExecuteBatchDml()
return stub->ExecuteBatchDml(context, request);
},
request, __func__);
Expand Down Expand Up @@ -1027,6 +1040,7 @@ StatusOr<spanner::CommitResult> ConnectionImpl::CommitImpl(
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
google::spanner::v1::CommitRequest const& request) {
RouteToLeader(context); // always for Commit()
return stub->Commit(context, request);
},
request, __func__);
Expand Down Expand Up @@ -1090,6 +1104,7 @@ Status ConnectionImpl::RollbackImpl(
Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
google::spanner::v1::RollbackRequest const& request) {
RouteToLeader(context); // always for Rollback()
return stub->Rollback(context, request);
},
request, __func__);
Expand Down
23 changes: 15 additions & 8 deletions google/cloud/spanner/internal/defaults.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,22 @@ Options DefaultOptions(Options opts) {
min_sessions =
(std::min)(min_sessions, max_sessions_per_channel * num_channels);

// TODO(#11111): Restore on-by-default behavior.
if (!opts.has<spanner::RouteToLeaderOption>()) {
if (auto e = internal::GetEnv("GOOGLE_CLOUD_CPP_SPANNER_ROUTE_TO_LEADER")) {
for (auto const* disable : {"N", "n", "F", "f", "0", "off"}) {
if (*e == disable) {
// Change the default from "for RW/PartitionedDml transactions"
// to "never".
opts.set<spanner::RouteToLeaderOption>(false);
break;
}
opts.set<spanner::RouteToLeaderOption>(false); // off by default
}
// ${GOOGLE_CLOUD_CPP_SPANNER_ROUTE_TO_LEADER} overrides option setting.
if (auto e = internal::GetEnv("GOOGLE_CLOUD_CPP_SPANNER_ROUTE_TO_LEADER")) {
for (auto const* disable : {"N", "n", "F", "f", "0", "off"}) {
if (*e == disable) {
// Never route to leader.
opts.set<spanner::RouteToLeaderOption>(false);
}
}
for (auto const* enable : {"Y", "y", "T", "t", "1", "on"}) {
if (*e == enable) {
// Route to leader for RW/PartitionedDml transactions.
opts.unset<spanner::RouteToLeaderOption>();
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions google/cloud/spanner/internal/defaults_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ TEST(Options, Defaults) {
EXPECT_TRUE(opts.has<SpannerBackoffPolicyOption>());
EXPECT_TRUE(opts.has<spanner_internal::SessionPoolClockOption>());

EXPECT_FALSE(opts.has<spanner::RouteToLeaderOption>());
// TODO(#11111): Restore on-by-default behavior.
ASSERT_TRUE(opts.has<spanner::RouteToLeaderOption>());
EXPECT_FALSE(opts.get<spanner::RouteToLeaderOption>());
}

TEST(Options, AdminDefaults) {
Expand Down Expand Up @@ -154,14 +156,21 @@ TEST(Options, SpannerEmulatorHost) {
EXPECT_NE(opts.get<GrpcCredentialOption>(), nullptr);
}

TEST(Options, RouteToLeaderFromEnv) {
TEST(Options, RouteToLeaderFromEnvOff) {
testing_util::ScopedEnvironment route_to_leader_env(
"GOOGLE_CLOUD_CPP_SPANNER_ROUTE_TO_LEADER", "off");
auto opts = spanner_internal::DefaultOptions();
EXPECT_TRUE(opts.has<spanner::RouteToLeaderOption>());
EXPECT_FALSE(opts.get<spanner::RouteToLeaderOption>());
}

TEST(Options, RouteToLeaderFromEnvOn) {
testing_util::ScopedEnvironment route_to_leader_env(
"GOOGLE_CLOUD_CPP_SPANNER_ROUTE_TO_LEADER", "on");
auto opts = spanner_internal::DefaultOptions();
EXPECT_FALSE(opts.has<spanner::RouteToLeaderOption>());
}

TEST(Options, TracingComponentsFromEnv) {
testing_util::ScopedEnvironment tracing_components_env(
"GOOGLE_CLOUD_CPP_ENABLE_TRACING", "c1,c2,c3");
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/spanner/internal/session_pool.h"
#include "google/cloud/spanner/internal/route_to_leader.h"
#include "google/cloud/spanner/internal/session.h"
#include "google/cloud/spanner/internal/status_utils.h"
#include "google/cloud/spanner/options.h"
Expand Down Expand Up @@ -396,6 +397,7 @@ Status SessionPool::CreateSessionsSync(
google::cloud::Idempotency::kIdempotent,
[&stub](grpc::ClientContext& context,
google::spanner::v1::BatchCreateSessionsRequest const& request) {
RouteToLeader(context); // always for BatchCreateSessions()
return stub->BatchCreateSessions(context, request);
},
request, __func__);
Expand Down Expand Up @@ -455,6 +457,7 @@ SessionPool::AsyncBatchCreateSessions(
Idempotency::kIdempotent, cq,
[stub](CompletionQueue& cq, std::shared_ptr<grpc::ClientContext> context,
google::spanner::v1::BatchCreateSessionsRequest const& request) {
RouteToLeader(*context); // always for BatchCreateSessions()
return stub->AsyncBatchCreateSessions(cq, std::move(context), request);
},
std::move(request), __func__);
Expand Down Expand Up @@ -491,6 +494,7 @@ SessionPool::AsyncRefreshSession(CompletionQueue& cq,
Idempotency::kIdempotent, cq,
[stub](CompletionQueue& cq, std::shared_ptr<grpc::ClientContext> context,
google::spanner::v1::ExecuteSqlRequest const& request) {
// Read-only transaction, so no route-to-leader.
return stub->AsyncExecuteSql(cq, std::move(context), request);
},
std::move(request), __func__);
Expand Down

0 comments on commit bedd265

Please sign in to comment.