Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): implement at-least-once Commit #11899

Merged
merged 1 commit into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions google/cloud/spanner/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@ StatusOr<CommitResult> Client::Commit(Transaction transaction,
CommitOptions(internal::CurrentOptions())});
}

StatusOr<CommitResult> Client::CommitAtLeastOnce(
Transaction::ReadWriteOptions transaction_options, Mutations mutations,
Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->Commit({spanner_internal::MakeSingleUseCommitTransaction(
std::move(transaction_options)),
std::move(mutations),
CommitOptions(internal::CurrentOptions())});
}

Status Client::Rollback(Transaction transaction, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), opts_));
return conn_->Rollback({std::move(transaction)});
Expand Down
34 changes: 31 additions & 3 deletions google/cloud/spanner/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class Client {
* @copydoc Read
*
* @param transaction_options Execute this read in a single-use transaction
* with these options.
* with these options.
*/
RowStream Read(Transaction::SingleUseOptions transaction_options,
std::string table, KeySet keys,
Expand Down Expand Up @@ -593,8 +593,8 @@ class Client {
*
* @param transaction The transaction to commit.
* @param mutations The mutations to be executed when this transaction
* commits. All mutations are applied atomically, in the order they appear
* in this list.
* commits. All mutations are applied atomically, in the order they
* appear in this list.
* @param opts (optional) The options to use for this call.
*
* @return A `StatusOr` containing the result of the commit or error status
Expand All @@ -603,6 +603,34 @@ class Client {
StatusOr<CommitResult> Commit(Transaction transaction, Mutations mutations,
Options opts = {});

/**
* Commits a write transaction with at-least-once semantics.
*
* Apply the given mutations atomically, using a single RPC, and therefore
* without replay protection. That is, it is possible that the mutations
* will be applied more than once. If the mutations are not idempotent, this
* may lead to a failure (for example, an insert may fail with "already
* exists" even though the row did not exist before the call was made).
* Accordingly, this call may only be appropriate for idempotent, latency-
* sensitive and/or high-throughput blind writing.
*
* @note Prefer the `Commit` overloads if you want exactly-once semantics
* or want to reapply mutations after a `kAborted` error.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a really simple example using this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'll add one in a follow-up.

* @param transaction_options Execute the commit in a temporary transaction
* with these options.
* @param mutations The mutations to be executed when this transaction
* commits. All mutations are applied atomically, in the order they
* appear in this list.
* @param opts (optional) The options to use for this call.
*
* @return A `StatusOr` containing the result of the commit or error status
* on failure.
*/
StatusOr<CommitResult> CommitAtLeastOnce(
Transaction::ReadWriteOptions transaction_options, Mutations mutations,
Options opts = {});

/**
* Rolls back a read-write transaction, releasing any locks it holds.
*
Expand Down
50 changes: 49 additions & 1 deletion google/cloud/spanner/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ MATCHER_P(HasTag, value, "bound to expected transaction tag") {
});
}

MATCHER(HasBegin, "not bound to a transaction-id nor invalidated") {
MATCHER(HasBegin, "bound to a new (begin) transaction") {
return spanner_internal::Visit(
arg, [&](spanner_internal::SessionHolder&,
StatusOr<google::spanner::v1::TransactionSelector>& s,
Expand All @@ -730,6 +730,27 @@ MATCHER(HasBegin, "not bound to a transaction-id nor invalidated") {
});
}

MATCHER(HasSingleUse, "bound to a temporary (single-use) transaction") {
return spanner_internal::Visit(
arg, [&](spanner_internal::SessionHolder&,
StatusOr<google::spanner::v1::TransactionSelector>& s,
spanner_internal::TransactionContext const&) {
if (!s) {
*result_listener << "has status " << s.status();
return false;
}
if (!s->has_single_use()) {
if (s->has_begin()) {
*result_listener << "is begin";
} else {
*result_listener << "has transaction-id " << s->id();
}
return false;
}
return true;
});
}

bool SetSessionName(Transaction const& txn, std::string name) {
return spanner_internal::Visit(
txn, [&name](spanner_internal::SessionHolder& session,
Expand Down Expand Up @@ -930,6 +951,33 @@ TEST(ClientTest, CommitStats) {
EXPECT_EQ(42, result->commit_stats->mutation_count);
}

TEST(ClientTest, CommitAtLeastOnce) {
auto timestamp =
spanner_internal::TimestampFromRFC3339("2023-06-02T07:36:52.808Z");
ASSERT_STATUS_OK(timestamp);
auto mutation = MakeDeleteMutation("table", KeySet::All());
std::string const transaction_tag = "app=cart,env=dev";

auto conn = std::make_shared<MockConnection>();
EXPECT_CALL(*conn, Commit)
.WillOnce([&mutation, &transaction_tag,
&timestamp](Connection::CommitParams const& cp) {
EXPECT_THAT(cp.transaction, HasSingleUse());
EXPECT_EQ(cp.mutations, Mutations{mutation});
EXPECT_FALSE(cp.options.return_stats());
EXPECT_FALSE(cp.options.request_priority().has_value());
EXPECT_EQ(cp.options.transaction_tag(), transaction_tag);
return CommitResult{*timestamp, absl::nullopt};
});

Client client(conn);
auto result = client.CommitAtLeastOnce(
Transaction::ReadWriteOptions{}, {mutation},
Options{}.set<TransactionTagOption>(transaction_tag));
ASSERT_STATUS_OK(result);
EXPECT_EQ(*timestamp, result->commit_timestamp);
}

TEST(ClientTest, ProfileQuerySuccess) {
auto conn = std::make_shared<MockConnection>();
Client client(conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,52 @@ TEST_F(ClientIntegrationTest, Commit) {
EXPECT_THAT(ids, UnorderedElementsAre(100, 199));
}

/// @test Verify the basics of CommitAtLeastOnce().
TEST_F(ClientIntegrationTest, CommitAtLeastOnce) {
// Insert SingerIds 200, 202, and 299.
auto isb =
InsertMutationBuilder("Singers", {"SingerId", "FirstName", "LastName"})
.EmplaceRow(200, "first-name-200", "last-name-200")
.EmplaceRow(202, "first-name-202", "last-name-202")
.EmplaceRow(299, "first-name-299", "last-name-299");
auto insert_result = client_->CommitAtLeastOnce(
Transaction::ReadWriteOptions{}, Mutations{isb.Build()});
if (insert_result) {
EXPECT_NE(Timestamp{}, insert_result->commit_timestamp);
} else {
if (insert_result.status().code() == StatusCode::kAborted) {
return; // try another day
}
// A replay will make it look like the row already exists.
EXPECT_THAT(insert_result, StatusIs(StatusCode::kAlreadyExists));
}

// Delete SingerId 202.
auto delete_result = client_->CommitAtLeastOnce(
Transaction::ReadWriteOptions{},
Mutations{MakeDeleteMutation("Singers", KeySet().AddKey(MakeKey(202)))});
if (delete_result) {
EXPECT_LT(insert_result->commit_timestamp, delete_result->commit_timestamp);
} else {
if (delete_result.status().code() == StatusCode::kAborted) {
return; // try another day
}
// A replay will make it look like the row doesn't exist.
EXPECT_THAT(delete_result, StatusIs(StatusCode::kNotFound));
}

// Read SingerIds [200 ... 300).
using RowType = std::tuple<std::int64_t>;
std::vector<std::int64_t> ids;
auto ks = KeySet().AddRange(MakeKeyBoundClosed(200), MakeKeyBoundOpen(300));
auto rows = client_->Read("Singers", std::move(ks), {"SingerId"});
for (auto const& row : StreamOf<RowType>(rows)) {
EXPECT_STATUS_OK(row);
if (row) ids.push_back(std::get<0>(*row));
}
EXPECT_THAT(ids, UnorderedElementsAre(200, 299));
}

/// @test Test various forms of ExecuteQuery() and ExecuteDml()
TEST_F(ClientIntegrationTest, ExecuteQueryDml) {
auto& client = *client_;
Expand Down
30 changes: 21 additions & 9 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1031,17 +1031,29 @@ StatusOr<spanner::CommitResult> ConnectionImpl::CommitImpl(
// (for a user-supplied transaction).
request.mutable_request_options()->set_transaction_tag(ctx.tag);

if (s->selector_case() != google::spanner::v1::TransactionSelector::kId) {
auto begin =
BeginTransaction(session, s->has_begin() ? s->begin() : s->single_use(),
std::string(), ctx, __func__);
if (!begin.ok()) {
s = begin.status(); // invalidate the transaction
return begin.status();
switch (s->selector_case()) {
case google::spanner::v1::TransactionSelector::kSingleUse: {
*request.mutable_single_use_transaction() = s->single_use();
break;
}
s->set_id(begin->id());
case google::spanner::v1::TransactionSelector::kBegin: {
auto begin =
BeginTransaction(session, s->begin(), std::string(), ctx, __func__);
if (!begin.ok()) {
s = begin.status(); // invalidate the transaction
return begin.status();
}
s->set_id(begin->id());
request.set_transaction_id(s->id());
break;
}
case google::spanner::v1::TransactionSelector::kId: {
request.set_transaction_id(s->id());
break;
}
default:
return Status(StatusCode::kInternal, "TransactionSelector state error");
}
request.set_transaction_id(s->id());

auto stub = session_pool_->GetStub(*session);
auto response = RetryLoop(
Expand Down
34 changes: 34 additions & 0 deletions google/cloud/spanner/internal/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2351,6 +2351,40 @@ TEST(ConnectionImplTest, CommitSuccessWithStats) {
EXPECT_EQ(42, commit->commit_stats->mutation_count);
}

TEST(ConnectionImplTest, CommitAtLeastOnce) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = spanner::Database("placeholder_project", "placeholder_instance",
"placeholder_database_id");
EXPECT_CALL(*mock, BatchCreateSessions(_, HasDatabase(db)))
.WillOnce(Return(MakeSessionsResponse({"test-session-name"})));
EXPECT_CALL(*mock, BeginTransaction).Times(0); // The whole point!
auto const commit_timestamp =
spanner::MakeTimestamp(std::chrono::system_clock::from_time_t(123))
.value();
EXPECT_CALL(*mock, Commit)
.WillOnce([commit_timestamp](
grpc::ClientContext&,
google::spanner::v1::CommitRequest const& request) {
EXPECT_EQ("test-session-name", request.session());
EXPECT_TRUE(request.has_single_use_transaction());
EXPECT_EQ(0, request.mutations_size());
EXPECT_FALSE(request.return_commit_stats());
EXPECT_EQ(google::spanner::v1::RequestOptions::PRIORITY_UNSPECIFIED,
request.request_options().priority());
EXPECT_THAT(request.request_options().request_tag(), IsEmpty());
EXPECT_THAT(request.request_options().transaction_tag(), IsEmpty());
return MakeCommitResponse(commit_timestamp);
});

auto conn = MakeConnectionImpl(db, mock);
internal::OptionsSpan span(MakeLimitedTimeOptions());
auto commit =
conn->Commit({spanner_internal::MakeSingleUseCommitTransaction({}),
spanner::Mutations{}, spanner::CommitOptions{}});
ASSERT_STATUS_OK(commit);
EXPECT_EQ(commit_timestamp, commit->commit_timestamp);
}

TEST(ConnectionImplTest, RollbackCreateSessionFailure) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = spanner::Database("project", "instance", "database");
Expand Down
9 changes: 9 additions & 0 deletions google/cloud/spanner/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ Transaction::Transaction(SingleUseOptions opts) {
std::move(selector), route_to_leader, std::string());
}

Transaction::Transaction(ReadWriteOptions opts, SingleUseCommitTag) {
google::spanner::v1::TransactionSelector selector;
*selector.mutable_single_use() = MakeOpts(std::move(opts.rw_opts_));
auto const route_to_leader = true; // write
impl_ = std::make_shared<spanner_internal::TransactionImpl>(
std::move(selector), route_to_leader,
std::move(opts.tag_).value_or(std::string()));
}

Transaction::Transaction(std::string session_id, std::string transaction_id,
bool route_to_leader, std::string transaction_tag) {
google::spanner::v1::TransactionSelector selector;
Expand Down
14 changes: 14 additions & 0 deletions google/cloud/spanner/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,12 @@ class Transaction {
private:
// Friendship for access by internal helpers.
friend struct spanner_internal::TransactionInternals;
struct SingleUseCommitTag {};

// Construction of a single-use transaction.
explicit Transaction(SingleUseOptions opts);
// Construction of a single-use commit transaction.
Transaction(ReadWriteOptions opts, SingleUseCommitTag);
// Construction of a transaction with existing IDs.
Transaction(std::string session_id, std::string transaction_id,
bool route_to_leader, std::string transaction_tag);
Expand Down Expand Up @@ -217,6 +220,12 @@ struct TransactionInternals {
return spanner::Transaction(std::move(su_opts));
}

static spanner::Transaction MakeSingleUseCommitTransaction(
spanner::Transaction::ReadWriteOptions opts) {
return spanner::Transaction(std::move(opts),
spanner::Transaction::SingleUseCommitTag{});
}

template <typename Functor>
// Pass `txn` by value, despite being used only once. This avoids the
// possibility of `txn` being destroyed by `f` before `Visit()` can
Expand All @@ -237,6 +246,11 @@ spanner::Transaction MakeSingleUseTransaction(T&& opts) {
return TransactionInternals::MakeSingleUseTransaction(std::forward<T>(opts));
}

inline spanner::Transaction MakeSingleUseCommitTransaction(
spanner::Transaction::ReadWriteOptions opts) {
return TransactionInternals::MakeSingleUseCommitTransaction(std::move(opts));
}

template <typename Functor>
// Pass `txn` by value, despite being used only once. This avoids the
// possibility of `txn` being destroyed by `f` before `Visit()` can
Expand Down