From 8bfc3f000f9df6163a8e40ca4dcc6d59efab59e8 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 7 May 2024 17:13:55 -0400 Subject: [PATCH] fix: update grpc WriteObject response handling to provide context when a failure happens --- .../storage/AsyncStorageTaskException.java | 31 ++++ ...edChunkedResumableWritableByteChannel.java | 59 +++++-- .../ResumableSessionFailureScenario.java | 158 ++++++++++++++++-- .../cloud/storage/StorageException.java | 33 ++-- .../ResumableSessionFailureScenarioTest.java | 131 +++++++++++++++ 5 files changed, 368 insertions(+), 44 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncStorageTaskException.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncStorageTaskException.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncStorageTaskException.java new file mode 100644 index 0000000000..7d7b585db7 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/AsyncStorageTaskException.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +/** + * This exception is used to preserve the caller's stacktrace when invoking an async task in a sync + * context. It will be added as a suppressed exception when propagating the async exception. This + * allows callers to catch ApiException thrown in an async operation, while still maintaining the + * call site. + */ +public final class AsyncStorageTaskException extends RuntimeException { + // mimic of com.google.api.gax.rpc.AsyncTaskException which doesn't have a public constructor + // if that class is ever made public, make this class extend it + AsyncStorageTaskException() { + super("Asynchronous task failed"); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java index bbcd986aaa..c3c3cd098b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java @@ -21,6 +21,7 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.api.gax.rpc.OutOfRangeException; @@ -208,14 +209,19 @@ private void flush( deps, alg, () -> { - Observer observer = new Observer(content, finalizing); + Observer observer = new Observer(content, finalizing, segments, internalContext); ApiStreamObserver write = callable.clientStreamingCall(observer); for (WriteObjectRequest message : segments) { write.onNext(message); } write.onCompleted(); - observer.await(); + try { + observer.await(); + } catch (Throwable t) { + t.addSuppressed(new AsyncStorageTaskException()); + throw t; + } return null; }, Decoder.identity()); @@ -230,13 +236,21 @@ class Observer implements ApiStreamObserver { private final RewindableContent content; private final boolean finalizing; + private final List segments; + private final GrpcCallContext context; private final SettableApiFuture invocationHandle; private volatile WriteObjectResponse last; - Observer(@Nullable RewindableContent content, boolean finalizing) { + Observer( + @Nullable RewindableContent content, + boolean finalizing, + @NonNull List segments, + GrpcCallContext context) { this.content = content; this.finalizing = finalizing; + this.segments = segments; + this.context = context; this.invocationHandle = SettableApiFuture.create(); } @@ -250,10 +264,20 @@ public void onError(Throwable t) { if (t instanceof OutOfRangeException) { OutOfRangeException oore = (OutOfRangeException) t; open = false; - invocationHandle.setException( - ResumableSessionFailureScenario.SCENARIO_5.toStorageException()); - } else { - invocationHandle.setException(t); + StorageException storageException = + ResumableSessionFailureScenario.SCENARIO_5.toStorageException( + segments, null, context, oore); + invocationHandle.setException(storageException); + } else if (t instanceof ApiException) { + // use StorageExceptions logic to translate from ApiException to our status codes ensuring + // things fall in line with our retry handlers. + // This is suboptimal, as it will initialize a second exception, however this is the + // unusual case, and it should not cause a significant overhead given its rarity. + StorageException tmp = StorageException.asStorageException((ApiException) t); + StorageException storageException = + ResumableSessionFailureScenario.toStorageException( + tmp.getCode(), tmp.getMessage(), tmp.getReason(), segments, null, context, t); + invocationHandle.setException(storageException); } } @@ -276,7 +300,8 @@ public void onCompleted() { writeCtx.getTotalSentBytes().set(persistedSize); writeCtx.getConfirmedBytes().set(persistedSize); } else { - throw ResumableSessionFailureScenario.SCENARIO_7.toStorageException(); + throw ResumableSessionFailureScenario.SCENARIO_7.toStorageException( + segments, last, context, null); } } else if (finalizing && last.hasResource()) { long totalSentBytes = writeCtx.getTotalSentBytes().get(); @@ -285,22 +310,28 @@ public void onCompleted() { writeCtx.getConfirmedBytes().set(finalSize); resultFuture.set(last); } else if (finalSize < totalSentBytes) { - throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException(); + throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException( + segments, last, context, null); } else { - throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException(); + throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException( + segments, last, context, null); } } else if (!finalizing && last.hasResource()) { - throw ResumableSessionFailureScenario.SCENARIO_1.toStorageException(); + throw ResumableSessionFailureScenario.SCENARIO_1.toStorageException( + segments, last, context, null); } else if (finalizing && last.hasPersistedSize()) { long totalSentBytes = writeCtx.getTotalSentBytes().get(); long persistedSize = last.getPersistedSize(); if (persistedSize < totalSentBytes) { - throw ResumableSessionFailureScenario.SCENARIO_3.toStorageException(); + throw ResumableSessionFailureScenario.SCENARIO_3.toStorageException( + segments, last, context, null); } else { - throw ResumableSessionFailureScenario.SCENARIO_2.toStorageException(); + throw ResumableSessionFailureScenario.SCENARIO_2.toStorageException( + segments, last, context, null); } } else { - throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException(); + throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException( + segments, last, context, null); } } catch (Throwable se) { open = false; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java index 0bccb2ff78..294d481cdc 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java @@ -16,19 +16,31 @@ package com.google.cloud.storage; +import static com.google.cloud.storage.Utils.ifNonNull; + import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpResponseException; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiException; import com.google.cloud.BaseServiceException; import com.google.cloud.storage.StorageException.IOExceptionCallable; import com.google.common.io.CharStreams; +import com.google.protobuf.MessageOrBuilder; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import io.grpc.StatusRuntimeException; import java.io.IOException; import java.io.InputStreamReader; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Predicate; import javax.annotation.ParametersAreNonnullByDefault; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @ParametersAreNonnullByDefault @@ -69,6 +81,10 @@ enum ResumableSessionFailureScenario { private static final String PREFIX_I = "\t|< "; private static final String PREFIX_O = "\t|> "; private static final String PREFIX_X = "\t| "; + // define some constants for tab widths that are more compressed that the literals + private static final String T1 = "\t"; + private static final String T2 = "\t\t"; + private static final String T3 = "\t\t\t"; private static final Predicate includedHeaders = matches("Content-Length") @@ -78,6 +94,7 @@ enum ResumableSessionFailureScenario { .or(matches("Range")) .or(startsWith("X-Goog-Stored-")) .or(matches("X-Goog-GCS-Idempotency-Token")) + .or(matches("X-Goog-request-params")) .or(matches("X-GUploader-UploadID")); private static final Predicate> includeHeader = @@ -116,8 +133,12 @@ StorageException toStorageException( return toStorageException(code, message, reason, uploadId, resp, cause, contentCallable); } - StorageException toStorageException() { - return new StorageException(code, message, reason, null); + StorageException toStorageException( + @NonNull List<@NonNull WriteObjectRequest> reqs, + @Nullable WriteObjectResponse resp, + @NonNull GrpcCallContext context, + @Nullable Throwable cause) { + return toStorageException(code, message, reason, reqs, resp, context, cause); } static StorageException toStorageException( @@ -136,6 +157,102 @@ static StorageException toStorageException( return se; } + static StorageException toStorageException( + int code, + String message, + @Nullable String reason, + @NonNull List<@NonNull WriteObjectRequest> reqs, + @Nullable WriteObjectResponse resp, + @NonNull GrpcCallContext context, + @Nullable Throwable cause) { + final StringBuilder sb = new StringBuilder(); + sb.append(message); + // request context + Map> extraHeaders = context.getExtraHeaders(); + recordHeadersTo(extraHeaders, PREFIX_O, sb); + int length = reqs.size(); + for (int i = 0; i < length; i++) { + if (i == 0) { + sb.append("\n").append(PREFIX_O).append("["); + } else { + sb.append(","); + } + WriteObjectRequest req = reqs.get(i); + sb.append("\n").append(PREFIX_O).append(T1).append(req.getClass().getName()).append("{"); + if (req.hasUploadId()) { + sb.append("\n").append(PREFIX_O).append(T2).append("upload_id: ").append(req.getUploadId()); + } + long writeOffset = req.getWriteOffset(); + if (req.hasChecksummedData()) { + ChecksummedData checksummedData = req.getChecksummedData(); + sb.append("\n").append(PREFIX_O).append(T2); + sb.append( + String.format( + "checksummed_data: {range: [%d:%d]", + writeOffset, writeOffset + checksummedData.getContent().size())); + if (checksummedData.hasCrc32C()) { + sb.append(", crc32c: ").append(checksummedData.getCrc32C()); + } + sb.append("}"); + } else { + sb.append("\n").append(PREFIX_O).append(T2).append("write_offset: ").append(writeOffset); + } + if (req.getFinishWrite()) { + sb.append("\n").append(PREFIX_O).append(T2).append("finish_write: true"); + } + if (req.hasObjectChecksums()) { + ObjectChecksums objectChecksums = req.getObjectChecksums(); + sb.append("\n").append(PREFIX_O).append(T2).append("object_checksums: ").append("{"); + fmt(objectChecksums, PREFIX_O, T3, sb); + sb.append("\n").append(PREFIX_O).append(T2).append("}"); + } + sb.append("\n").append(PREFIX_O).append("\t}"); + if (i == length - 1) { + sb.append("\n").append(PREFIX_O).append("]"); + } + } + + sb.append("\n").append(PREFIX_X); + + // response context + if (resp != null) { + sb.append("\n").append(PREFIX_I).append(resp.getClass().getName()).append("{"); + fmt(resp, PREFIX_I, T1, sb); + sb.append("\n").append(PREFIX_I).append("}"); + sb.append("\n").append(PREFIX_X); + } + + if (cause != null) { + if (cause instanceof ApiException) { + ApiException apiException = (ApiException) cause; + Throwable cause1 = apiException.getCause(); + if (cause1 instanceof StatusRuntimeException) { + StatusRuntimeException statusRuntimeException = (StatusRuntimeException) cause1; + sb.append("\n").append(PREFIX_I).append(statusRuntimeException.getStatus()); + ifNonNull( + statusRuntimeException.getTrailers(), + t -> sb.append("\n").append(PREFIX_I).append(t)); + } else { + sb.append("\n") + .append(PREFIX_I) + .append("code: ") + .append(apiException.getStatusCode().toString()); + ifNonNull( + apiException.getReason(), + r -> sb.append("\n").append(PREFIX_I).append("reason: ").append(r)); + ifNonNull( + apiException.getDomain(), + d -> sb.append("\n").append(PREFIX_I).append("domain: ").append(d)); + ifNonNull( + apiException.getErrorDetails(), + e -> sb.append("\n").append(PREFIX_I).append("errorDetails: ").append(e)); + } + sb.append("\n").append(PREFIX_X); + } + } + return new StorageException(code, sb.toString(), reason, cause); + } + static StorageException toStorageException( int overrideCode, String message, @@ -213,14 +330,21 @@ private static Predicate startsWith(String prefix) { } private static void recordHeaderTo(HttpHeaders h, String prefix, StringBuilder sb) { - h.entrySet().stream() - .filter(includeHeader) - .forEach( - e -> { - String key = e.getKey(); - String value = headerValueToString(e.getValue()); - sb.append("\n").append(prefix).append(key).append(": ").append(value); - }); + h.entrySet().stream().filter(includeHeader).forEach(writeHeaderValue(prefix, sb)); + } + + private static void recordHeadersTo( + Map> headers, String prefix, StringBuilder sb) { + headers.entrySet().stream().filter(includeHeader).forEach(writeHeaderValue(prefix, sb)); + } + + private static Consumer> writeHeaderValue( + String prefix, StringBuilder sb) { + return e -> { + String key = e.getKey(); + String value = headerValueToString(e.getValue()); + sb.append("\n").append(prefix).append(key).append(": ").append(value); + }; } private static String headerValueToString(Object o) { @@ -233,4 +357,18 @@ private static String headerValueToString(Object o) { return o.toString(); } + + private static void fmt( + MessageOrBuilder msg, + @SuppressWarnings("SameParameterValue") String prefix, + String indentation, + StringBuilder sb) { + String string = msg.toString(); + // drop the final new line before prefixing + string = string.replaceAll("\n$", ""); + sb.append("\n") + .append(prefix) + .append(indentation) + .append(string.replaceAll("\r?\n", "\n" + prefix + indentation)); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageException.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageException.java index eeb24a44b8..63708115fd 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageException.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageException.java @@ -139,13 +139,19 @@ static StorageException asStorageException(ApiException apiEx) { } // If there is a gRPC exception in our cause change pull it's error message up to be our // message otherwise, create a generic error message with the status code. - String statusCodeName = statusCode.getCode().name(); - String statusExceptionMessage = getStatusExceptionMessage(apiEx); - - String message; - if (statusExceptionMessage != null) { - message = statusCodeName + ": " + statusExceptionMessage; - } else { + String message = null; + if (apiEx.getCause() != null) { + Throwable cause = apiEx.getCause(); + if (cause instanceof StatusRuntimeException || cause instanceof StatusException) { + message = cause.getMessage(); + } + // if not a grpc exception fall through to the default handling + } + if (message == null && apiEx.getMessage() != null) { + message = apiEx.getMessage(); + } + if (message == null) { + String statusCodeName = statusCode.getCode().name(); message = "Error: " + statusCodeName; } @@ -198,19 +204,6 @@ static T wrapFutureGet(ApiFuture f) { } } - @Nullable - private static String getStatusExceptionMessage(Exception apiEx) { - if (apiEx.getMessage() != null) { - return apiEx.getMessage(); - } else { - Throwable cause = apiEx.getCause(); - if (cause instanceof StatusRuntimeException || cause instanceof StatusException) { - return cause.getMessage(); - } - return null; - } - } - @Nullable private static ApiException asApiExceptionOrNull(Throwable cause) { if (cause instanceof ApiException) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ResumableSessionFailureScenarioTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ResumableSessionFailureScenarioTest.java index 0a53a4bbc6..4c17e19c44 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ResumableSessionFailureScenarioTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ResumableSessionFailureScenarioTest.java @@ -16,8 +16,12 @@ package com.google.cloud.storage; +import static com.google.cloud.storage.ByteSizeConstants._256KiB; +import static com.google.cloud.storage.ByteSizeConstants._512KiB; +import static com.google.cloud.storage.ResumableSessionFailureScenario.SCENARIO_1; import static com.google.cloud.storage.ResumableSessionFailureScenario.isContinue; import static com.google.cloud.storage.ResumableSessionFailureScenario.isOk; +import static com.google.cloud.storage.TestUtils.assertAll; import static com.google.common.truth.Truth.assertThat; import com.google.api.client.http.EmptyContent; @@ -26,8 +30,24 @@ import com.google.api.client.http.HttpResponse; import com.google.api.client.json.gson.GsonFactory; import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ApiExceptionFactory; import com.google.api.services.storage.model.StorageObject; +import com.google.cloud.storage.it.ChecksummedTestContent; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ByteString; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import io.grpc.Metadata; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; +import io.grpc.internal.GrpcUtil; import java.io.IOException; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -158,6 +178,117 @@ public void xGoogGcsIdempotencyTokenHeadersIncludedIfPresent() throws IOExceptio assertThat(storageException).hasMessageThat().contains("|< x-goog-gcs-idempotency-token: 5"); } + @Test + public void grpc_response() throws Exception { + ChecksummedTestContent content = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(_256KiB)); + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId("uploadId") + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .build(); + WriteObjectRequest req2 = + WriteObjectRequest.newBuilder() + .setWriteOffset(_256KiB) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent(ByteString.copyFrom(content.getBytes())) + .setCrc32C(content.getCrc32c()) + .build()) + .build(); + WriteObjectRequest req3 = + WriteObjectRequest.newBuilder() + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .setObjectChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(345) + .setMd5Hash(ByteString.copyFromUtf8("asdf")) + .build()) + .build(); + WriteObjectResponse resp1 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("obj").setSize(_512KiB).build()) + .build(); + GrpcCallContext context = + GrpcCallContext.createDefault() + .withExtraHeaders( + ImmutableMap.of( + "x-goog-request-params", + ImmutableList.of("bucket=projects/_/bucket/bucket-name"))); + StorageException se = + SCENARIO_1.toStorageException(ImmutableList.of(req1, req2, req3), resp1, context, null); + assertAll( + () -> + assertThat(se) + .hasMessageThat() + .contains("x-goog-request-params: bucket=projects/_/bucket/bucket-name"), + () -> assertThat(se).hasMessageThat().contains("upload_id: "), + () -> assertThat(se).hasMessageThat().contains("0:262144"), + () -> assertThat(se).hasMessageThat().contains(", crc32c: "), // from ChecksummedData + () -> assertThat(se).hasMessageThat().contains("write_offset: "), + () -> assertThat(se).hasMessageThat().contains("finish_write: "), + () -> assertThat(se).hasMessageThat().contains("object_checksums: "), + () -> assertThat(se).hasMessageThat().contains("crc32c: "), // from object_checksums + () -> assertThat(se).hasMessageThat().contains("md5_hash: "), + () -> assertThat(se).hasMessageThat().contains("resource {")); + } + + @Test + public void grpc_apiException() throws Exception { + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId("uploadId") + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .build(); + GrpcCallContext context = Retrying.newCallContext(); + Code code = Code.FAILED_PRECONDITION; + Metadata trailers = new Metadata(); + trailers.put(GrpcUtil.USER_AGENT_KEY, "test-class/"); + StatusRuntimeException statusRuntimeException = + code.toStatus().withDescription("precondition did not hold").asRuntimeException(trailers); + ApiException apiException = + ApiExceptionFactory.createException(statusRuntimeException, GrpcStatusCode.of(code), true); + + StorageException se = + SCENARIO_1.toStorageException(ImmutableList.of(req1), null, context, apiException); + assertAll( + () -> assertThat(se).hasMessageThat().contains("upload_id: "), + () -> assertThat(se).hasMessageThat().contains("0:262144"), + () -> assertThat(se).hasMessageThat().doesNotContain("WriteObjectResponse"), + () -> assertThat(se).hasMessageThat().contains("Status{code=FAILED_PRECONDITION"), + () -> assertThat(se).hasMessageThat().contains("user-agent=test-class/")); + } + + @Test + public void grpc_nonApiException() throws Exception { + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId("uploadId") + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .build(); + GrpcCallContext context = Retrying.newCallContext(); + Cause cause = new Cause(); + StorageException se = + SCENARIO_1.toStorageException(ImmutableList.of(req1), null, context, cause); + assertAll( + () -> assertThat(se).hasMessageThat().contains("upload_id: "), + () -> assertThat(se).hasMessageThat().contains("0:262144"), + () -> assertThat(se).hasMessageThat().doesNotContain("WriteObjectResponse")); + } + private static final class Cause extends RuntimeException { private Cause() {