Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move session lastUseTime parameter from PooledSession to SessionImpl class. Fix updation of the parameter for chained RPCs within one transaction. #2704

Merged
merged 32 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
edc5bbf
fix: prevent illegal negative timeout values into thread sleep() meth…
arpan14 Feb 6, 2023
49a85df
Merge pull request #1 from arpan14/retryerror
arpan14 Feb 8, 2023
4cd497b
Fixing lint issues.
arpan14 Feb 8, 2023
4a6aa8e
Merge branch 'googleapis:main' into main
arpan14 Mar 13, 2023
b2aa09d
Merge branch 'googleapis:main' into main
arpan14 Mar 15, 2023
8d6d71e
Merge branch 'googleapis:main' into main
arpan14 May 9, 2023
77e6e7d
Merge branch 'googleapis:main' into main
arpan14 Jul 17, 2023
e8b7fad
Merge branch 'googleapis:main' into main
arpan14 Jul 25, 2023
8aa84e1
Merge branch 'googleapis:main' into main
arpan14 Oct 10, 2023
922f324
refactor: move session lastUseTime parameter from PooledSession to Se…
arpan14 Oct 17, 2023
b544080
chore: add clock instances in callees of SessionImpl.
arpan14 Oct 19, 2023
9862265
chore: partially fix failing unit tests in SessionPoolTest and Sessio…
arpan14 Oct 26, 2023
5e5f769
chore: fix failing tests in SessionPoolStressTest.
arpan14 Oct 27, 2023
a385ceb
chore: update lastUseTime for methods in SessionPoolTransactionContex…
arpan14 Oct 27, 2023
fd3bb41
chore: lint errors.
arpan14 Oct 27, 2023
4864053
chore: fix tests in DatabaseClientImplTest by passing the mocked cloc…
arpan14 Oct 27, 2023
f5b82fa
fix: update session lastUseTime field for AbstractReadContext class. …
arpan14 Oct 28, 2023
73f0192
fix: failing tests in TransactionRunnerImplTest.
arpan14 Oct 29, 2023
ff32178
fix: failing test in SessionPoolMaintainerTest.
arpan14 Oct 29, 2023
1316579
refactor: move FakeClock to a new class.
arpan14 Oct 29, 2023
80dd971
refactor: move Clock to a new class.
arpan14 Oct 30, 2023
1acd645
chore: resolving PR comments.
arpan14 Oct 31, 2023
6af8187
chore: address review comments.
arpan14 Oct 31, 2023
999a39b
chore: updating lastUseTime state in TransactionRunnerImpl. Removing …
arpan14 Oct 31, 2023
ec80d6a
chore: remove redundant update statements from SessionPool class. Add…
arpan14 Nov 1, 2023
6cdef81
chore: add more tests for TransactionRunner.
arpan14 Nov 1, 2023
593a10b
chore: remove dead code from constructor of SessionPoolTransactionCon…
arpan14 Nov 1, 2023
ced1e06
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 1, 2023
0485aee
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Tr…
arpan14 Nov 2, 2023
c4163d8
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Tr…
arpan14 Nov 2, 2023
b75b19f
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Tr…
arpan14 Nov 2, 2023
86327e2
chore: fixing precondition errors due to null clock.
arpan14 Nov 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.24.0')
implementation platform('com.google.cloud:libraries-bom:26.26.0')

implementation 'com.google.cloud:google-cloud-spanner'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-spanner:6.49.0'
implementation 'com.google.cloud:google-cloud-spanner:6.52.1'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.49.0"
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.52.1"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -432,7 +432,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.49.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.52.1
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
Expand Down Expand Up @@ -72,6 +73,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private ExecutorProvider executorProvider;
private Clock clock = new Clock();

Builder() {}

Expand Down Expand Up @@ -110,6 +112,11 @@ B setExecutorProvider(ExecutorProvider executorProvider) {
return self();
}

B setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
return self();
}

abstract T build();
}

Expand Down Expand Up @@ -392,6 +399,8 @@ void initTransaction() {
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

private final Clock clock;

@GuardedBy("lock")
private boolean isValid = true;

Expand All @@ -416,6 +425,7 @@ void initTransaction() {
this.defaultQueryOptions = builder.defaultQueryOptions;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
}

@Override
Expand Down Expand Up @@ -689,6 +699,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
session.markUsed(clock.instant());
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
Expand Down Expand Up @@ -826,6 +837,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
session.markUsed(clock.instant());
olavloite marked this conversation as resolved.
Show resolved Hide resolved
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import org.threeten.bp.Instant;

/**
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
* Clock.
*/
class Clock {
Instant instant() {
return Instant.now();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.threeten.bp.Instant;

/**
* Implementation of {@link Session}. Sessions are managed internally by the client library, and
Expand Down Expand Up @@ -98,12 +99,14 @@ interface SessionTransaction {
ByteString readyTransactionId;
private final Map<SpannerRpc.Option, ?> options;
private Span currentSpan;
private volatile Instant lastUseTime;

SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
this.spanner = spanner;
this.options = options;
this.name = checkNotNull(name);
this.databaseId = SessionId.of(name).getDatabaseId();
this.lastUseTime = Instant.now();
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -123,6 +126,14 @@ Span getCurrentSpan() {
return currentSpan;
}

Instant getLastUseTime() {
return lastUseTime;
}

void markUsed(Instant instant) {
lastUseTime = instant;
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
setActive(null);
Expand Down Expand Up @@ -385,6 +396,9 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean
}

TransactionContextImpl newTransaction(Options options) {
// A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
final Clock poolMaintainerClock =
spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
return TransactionContextImpl.newBuilder()
.setSession(this)
.setOptions(options)
Expand All @@ -396,6 +410,7 @@ TransactionContextImpl newTransaction(Options options) {
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.setClock(poolMaintainerClock == null ? new Clock() : poolMaintainerClock)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,6 @@ void maybeWaitOnMinSessions() {
}
}

/**
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
* Clock.
*/
static class Clock {
Instant instant() {
return Instant.now();
}
}

private abstract static class CachedResultSetSupplier implements Supplier<ResultSet> {
private ResultSet cached;

Expand Down Expand Up @@ -1370,7 +1360,6 @@ PooledSession get(final boolean eligibleForLongRunning) {

class PooledSession implements Session {
@VisibleForTesting SessionImpl delegate;
private volatile Instant lastUseTime;
private volatile SpannerException lastException;
private volatile boolean allowReplacing = true;

Expand Down Expand Up @@ -1409,7 +1398,9 @@ class PooledSession implements Session {
private PooledSession(SessionImpl delegate) {
this.delegate = delegate;
this.state = SessionState.AVAILABLE;
this.lastUseTime = clock.instant();

// initialise the lastUseTime field for each session.
this.markUsed();
}

int getChannel() {
Expand Down Expand Up @@ -1631,7 +1622,7 @@ private void markClosing() {
}

void markUsed() {
lastUseTime = clock.instant();
delegate.markUsed(clock.instant());
}

@Override
Expand Down Expand Up @@ -1827,7 +1818,7 @@ private void removeIdleSessions(Instant currTime) {
Iterator<PooledSession> iterator = sessions.descendingIterator();
while (iterator.hasNext()) {
PooledSession session = iterator.next();
if (session.lastUseTime.isBefore(minLastUseTime)) {
if (session.delegate.getLastUseTime().isBefore(minLastUseTime)) {
if (session.state != SessionState.CLOSING) {
boolean isRemoved = removeFromPool(session);
if (isRemoved) {
Expand Down Expand Up @@ -1929,7 +1920,8 @@ private void removeLongRunningSessions(
// collection is populated only when the get() method in {@code PooledSessionFuture} is
// called.
final PooledSession session = sessionFuture.get();
final Duration durationFromLastUse = Duration.between(session.lastUseTime, currentTime);
final Duration durationFromLastUse =
Duration.between(session.delegate.getLastUseTime(), currentTime);
if (!session.eligibleForLongRunning
&& durationFromLastUse.compareTo(
inactiveTransactionRemovalOptions.getIdleTimeThreshold())
Expand Down Expand Up @@ -2327,7 +2319,7 @@ private PooledSession findSessionToKeepAlive(
&& (numChecked + numAlreadyChecked)
< (options.getMinSessions() + options.getMaxIdleSessions() - numSessionsInUse)) {
PooledSession session = iterator.next();
if (session.lastUseTime.isBefore(keepAliveThreshold)) {
if (session.delegate.getLastUseTime().isBefore(keepAliveThreshold)) {
iterator.remove();
return session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.spanner;

import com.google.cloud.spanner.SessionPool.Clock;
import com.google.cloud.spanner.SessionPool.Position;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,19 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {

private Clock clock = new Clock();
private ByteString transactionId;
private Options options;
private boolean trackTransactionStarter;

private Builder() {}

Builder setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
return self();
}

Builder setTransactionId(ByteString transactionId) {
this.transactionId = transactionId;
return self();
Expand Down Expand Up @@ -189,13 +196,15 @@ public void removeListener(Runnable listener) {
volatile ByteString transactionId;

private CommitResponse commitResponse;
private final Clock clock;

private TransactionContextImpl(Builder builder) {
super(builder);
this.transactionId = builder.transactionId;
this.trackTransactionStarter = builder.trackTransactionStarter;
this.options = builder.options;
this.finishedAsyncOperations.set(null);
this.clock = builder.clock;
}

@Override
Expand Down Expand Up @@ -389,6 +398,7 @@ public void run() {
tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan();
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture =
rpc.commitAsync(commitRequest, session.getOptions());
session.markUsed(clock.instant());
commitFuture.addListener(
tracer.withSpan(
opSpan,
Expand Down Expand Up @@ -463,12 +473,15 @@ ApiFuture<Empty> rollbackAsync() {
// is still in flight. That transaction will then automatically be terminated by the server.
if (transactionId != null) {
span.addAnnotation("Starting Rollback");
return rpc.rollbackAsync(
RollbackRequest.newBuilder()
.setSession(session.getName())
.setTransactionId(transactionId)
.build(),
session.getOptions());
ApiFuture<Empty> apiFuture =
rpc.rollbackAsync(
RollbackRequest.newBuilder()
.setSession(session.getName())
.setTransactionId(transactionId)
.build(),
session.getOptions());
session.markUsed(clock.instant());
return apiFuture;
} else {
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
Expand Down Expand Up @@ -723,6 +736,7 @@ private ResultSet internalExecuteUpdate(
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
session.markUsed(clock.instant());
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
Expand Down Expand Up @@ -753,6 +767,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
// commit.
increaseAsyncOperations();
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader());
session.markUsed(clock.instant());
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
Expand Down Expand Up @@ -824,6 +839,7 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
try {
com.google.spanner.v1.ExecuteBatchDmlResponse response =
rpc.executeBatchDml(builder.build(), session.getOptions());
session.markUsed(clock.instant());
long[] results = new long[response.getResultSetsCount()];
for (int i = 0; i < response.getResultSetsCount(); ++i) {
results[i] = response.getResultSets(i).getStats().getRowCountExact();
Expand Down Expand Up @@ -863,6 +879,7 @@ public ApiFuture<long[]> batchUpdateAsync(
// commit.
increaseAsyncOperations();
response = rpc.executeBatchDmlAsync(builder.build(), session.getOptions());
session.markUsed(clock.instant());
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
Expand Down
Loading